it works, but locking needs improvements

This commit is contained in:
Bryan Stitt 2022-04-27 22:53:13 +00:00
parent 9e457154a3
commit 302ff8a252
5 changed files with 86 additions and 33 deletions

View File

@ -28,10 +28,28 @@ cargo run -r -- --eth-primary-rpc "https://your.favorite.provider"
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","params":[],"id":67}' 127.0.0.1:8845/eth curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","params":[],"id":67}' 127.0.0.1:8845/eth
``` ```
## Load Testing
Test the proxy:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
Test geth:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
Test erigon:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
## Todo ## Todo
- [x] simple proxy - [x] simple proxy
- [ ] better locking. when lots of requests come in, we seem to be in the way of block updates
- [ ] proper logging - [ ] proper logging
- [ ] load balance between multiple RPC servers - [ ] load balance between multiple RPC servers
- [ ] support more than just ETH - [ ] support more than just ETH
@ -39,3 +57,6 @@ curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","metho
- [ ] health check nodes by block height - [ ] health check nodes by block height
- [ ] measure latency to nodes - [ ] measure latency to nodes
- [ ] Dockerfile - [ ] Dockerfile
- [ ] testing getLatestBlockByNumber is not great because the latest block changes and so one run is likely to be different than another
- [ ] if a request gets a socket timeout, try on another server
- maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly?

View File

@ -0,0 +1,3 @@
wrk.method = "POST"
wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":420}"
wrk.headers["Content-Type"] = "application/json"

View File

@ -0,0 +1,3 @@
wrk.method = "POST"
wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"latest\", false],\"id\":420}"
wrk.headers["Content-Type"] = "application/json"

View File

@ -132,7 +132,7 @@ impl BlockWatcher {
// first block seen // first block seen
self.head_block_number self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst); .swap(new_block_number, atomic::Ordering::SeqCst);
"+".to_string() ", +".to_string()
} else { } else {
// TODO: what if they have the same number but different hashes? // TODO: what if they have the same number but different hashes?
// TODO: alert if there is a large chain split? // TODO: alert if there is a large chain split?
@ -145,19 +145,24 @@ impl BlockWatcher {
// new_block is the new head_block // new_block is the new head_block
self.head_block_number self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst); .swap(new_block_number, atomic::Ordering::SeqCst);
"+".to_string() ", +".to_string()
} }
cmp::Ordering::Less => { cmp::Ordering::Less => {
// this rpc is behind // this rpc is behind
let lag = new_block_number as i64 - head_number as i64; let lag = new_block_number as i64 - head_number as i64;
lag.to_string()
let mut s = ", ".to_string();
s.push_str(&lag.to_string());
s
} }
} }
}; };
// TODO: include time since last update? // TODO: include time since last update?
info!( info!(
"{:?} = {}, {}, {} sec, {}", "{:?} = {}, {}, {} sec{}",
new_block.hash.unwrap(), new_block.hash.unwrap(),
new_block.number.unwrap(), new_block.number.unwrap(),
rpc, rpc,

View File

@ -4,11 +4,12 @@ mod provider_tiers;
use futures::future; use futures::future;
use governor::clock::{Clock, QuantaClock}; use governor::clock::{Clock, QuantaClock};
use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use tokio::time::sleep; use tokio::time::sleep;
use tracing::log::warn; use tracing::{info, warn};
use warp::Filter; use warp::Filter;
// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; // use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap};
@ -115,6 +116,8 @@ impl Web3ProxyApp {
loop { loop {
let read_lock = self.private_rpcs_ratelimiter_lock.read().await; let read_lock = self.private_rpcs_ratelimiter_lock.read().await;
let json_body_clone = json_body.clone();
match private_rpcs match private_rpcs
.get_upstream_servers(1, self.block_watcher.clone()) .get_upstream_servers(1, self.block_watcher.clone())
.await .await
@ -125,11 +128,18 @@ impl Web3ProxyApp {
let clone = self.clone(); let clone = self.clone();
let connections = private_rpcs.clone_connections(); let connections = private_rpcs.clone_connections();
let json_body = json_body.clone();
// check incoming_id before sending any requests
let incoming_id = json_body.as_object().unwrap().get("id").unwrap();
tokio::spawn(async move { tokio::spawn(async move {
clone clone
.try_send_requests(upstream_servers, connections, json_body, tx) .try_send_requests(
upstream_servers,
connections,
json_body_clone,
tx,
)
.await .await
}); });
@ -138,7 +148,12 @@ impl Web3ProxyApp {
.await .await
.ok_or_else(|| anyhow::anyhow!("no successful response"))?; .ok_or_else(|| anyhow::anyhow!("no successful response"))?;
if let Ok(response) = response { if let Ok(partial_response) = response {
let response = json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
});
return Ok(warp::reply::json(&response)); return Ok(warp::reply::json(&response));
} }
} }
@ -165,36 +180,51 @@ impl Web3ProxyApp {
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
let mut earliest_not_until = None; let mut earliest_not_until = None;
// check incoming_id before sending any requests
let incoming_id = json_body.as_object().unwrap().get("id").unwrap();
for balanced_rpcs in self.balanced_rpc_tiers.iter() { for balanced_rpcs in self.balanced_rpc_tiers.iter() {
match balanced_rpcs match balanced_rpcs
.next_upstream_server(1, self.block_watcher.clone()) .next_upstream_server(1, self.block_watcher.clone())
.await .await
{ {
Ok(upstream_server) => { Ok(upstream_server) => {
// TODO: better type for this. right now its request (the full jsonrpc object), response (just the inner result)
let (tx, mut rx) = let (tx, mut rx) =
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>(); mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
let clone = self.clone(); {
let connections = balanced_rpcs.clone_connections(); // clone things so we can move them into the future and still use them here
let json_body = json_body.clone(); let clone = self.clone();
let connections = balanced_rpcs.clone_connections();
let json_body = json_body.clone();
let upstream_server = upstream_server.clone();
tokio::spawn(async move { tokio::spawn(async move {
clone clone
.try_send_requests( .try_send_requests(
vec![upstream_server], vec![upstream_server],
connections, connections,
json_body, json_body,
tx, tx,
) )
.await .await
}); });
}
let response = rx let response = rx
.recv() .recv()
.await .await
.ok_or_else(|| anyhow::anyhow!("no successful response"))?; .ok_or_else(|| anyhow::anyhow!("no successful response"))?;
if let Ok(response) = response { if let Ok(partial_response) = response {
info!("forwarding request from {}", upstream_server);
let response = json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
});
return Ok(warp::reply::json(&response)); return Ok(warp::reply::json(&response));
} }
} }
@ -240,13 +270,10 @@ impl Web3ProxyApp {
rpc_servers: Vec<String>, rpc_servers: Vec<String>,
connections: Arc<Web3ConnectionMap>, connections: Arc<Web3ConnectionMap>,
json_request_body: serde_json::Value, json_request_body: serde_json::Value,
// TODO: better type for this
tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>, tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1} // {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1}
let incoming_id = json_request_body
.get("id")
.ok_or_else(|| anyhow::anyhow!("bad id"))?
.to_owned();
let method = json_request_body let method = json_request_body
.get("method") .get("method")
.and_then(|x| x.as_str()) .and_then(|x| x.as_str())
@ -259,7 +286,6 @@ impl Web3ProxyApp {
// send the query to all the servers // send the query to all the servers
let bodies = future::join_all(rpc_servers.into_iter().map(|rpc| { let bodies = future::join_all(rpc_servers.into_iter().map(|rpc| {
let incoming_id = incoming_id.clone();
let connections = connections.clone(); let connections = connections.clone();
let method = method.clone(); let method = method.clone();
let params = params.clone(); let params = params.clone();
@ -273,15 +299,10 @@ impl Web3ProxyApp {
connections.get_mut(&rpc).unwrap().dec_active_requests(); connections.get_mut(&rpc).unwrap().dec_active_requests();
let mut response = response?; let response = response?;
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response // TODO: if "no block with that header" or some other jsonrpc errors, skip this response
// replace the id with what we originally received
if let Some(response_id) = response.get_mut("id") {
*response_id = incoming_id;
}
// send the first good response to a one shot channel. that way we respond quickly // send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send // drop the result because errors are expected after the first send
let _ = tx.send(Ok(response)); let _ = tx.send(Ok(response));