From 302ff8a25204b63a5b28d9946fbd1e55d13591d8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 27 Apr 2022 22:53:13 +0000 Subject: [PATCH] it works, but locking needs improvements --- README.md | 21 ++++++++ data/wrk/getBlockNumber.lua | 3 ++ data/wrk/getLatestBlockByNumber.lua | 3 ++ src/block_watcher.rs | 13 +++-- src/main.rs | 79 ++++++++++++++++++----------- 5 files changed, 86 insertions(+), 33 deletions(-) create mode 100644 data/wrk/getBlockNumber.lua create mode 100644 data/wrk/getLatestBlockByNumber.lua diff --git a/README.md b/README.md index c120c3a7..1ac94701 100644 --- a/README.md +++ b/README.md @@ -28,10 +28,28 @@ cargo run -r -- --eth-primary-rpc "https://your.favorite.provider" curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","params":[],"id":67}' 127.0.0.1:8845/eth ``` +## Load Testing + +Test the proxy: + + wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 + wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 + +Test geth: + + wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + +Test erigon: + + wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + ## Todo - [x] simple proxy +- [ ] better locking. when lots of requests come in, we seem to be in the way of block updates - [ ] proper logging - [ ] load balance between multiple RPC servers - [ ] support more than just ETH @@ -39,3 +57,6 @@ curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","metho - [ ] health check nodes by block height - [ ] measure latency to nodes - [ ] Dockerfile +- [ ] testing getLatestBlockByNumber is not great because the latest block changes and so one run is likely to be different than another +- [ ] if a request gets a socket timeout, try on another server + - maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly? diff --git a/data/wrk/getBlockNumber.lua b/data/wrk/getBlockNumber.lua new file mode 100644 index 00000000..e3dd78bd --- /dev/null +++ b/data/wrk/getBlockNumber.lua @@ -0,0 +1,3 @@ +wrk.method = "POST" +wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":420}" +wrk.headers["Content-Type"] = "application/json" diff --git a/data/wrk/getLatestBlockByNumber.lua b/data/wrk/getLatestBlockByNumber.lua new file mode 100644 index 00000000..bea1576e --- /dev/null +++ b/data/wrk/getLatestBlockByNumber.lua @@ -0,0 +1,3 @@ +wrk.method = "POST" +wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"latest\", false],\"id\":420}" +wrk.headers["Content-Type"] = "application/json" diff --git a/src/block_watcher.rs b/src/block_watcher.rs index b46bf6ab..f4ef5bcd 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -132,7 +132,7 @@ impl BlockWatcher { // first block seen self.head_block_number .swap(new_block_number, atomic::Ordering::SeqCst); - "+".to_string() + ", +".to_string() } else { // TODO: what if they have the same number but different hashes? // TODO: alert if there is a large chain split? @@ -145,19 +145,24 @@ impl BlockWatcher { // new_block is the new head_block self.head_block_number .swap(new_block_number, atomic::Ordering::SeqCst); - "+".to_string() + ", +".to_string() } cmp::Ordering::Less => { // this rpc is behind let lag = new_block_number as i64 - head_number as i64; - lag.to_string() + + let mut s = ", ".to_string(); + + s.push_str(&lag.to_string()); + + s } } }; // TODO: include time since last update? info!( - "{:?} = {}, {}, {} sec, {}", + "{:?} = {}, {}, {} sec{}", new_block.hash.unwrap(), new_block.number.unwrap(), rpc, diff --git a/src/main.rs b/src/main.rs index ac58f767..a2d5a1a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,12 @@ mod provider_tiers; use futures::future; use governor::clock::{Clock, QuantaClock}; +use serde_json::json; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, RwLock}; use tokio::time::sleep; -use tracing::log::warn; +use tracing::{info, warn}; use warp::Filter; // use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; @@ -115,6 +116,8 @@ impl Web3ProxyApp { loop { let read_lock = self.private_rpcs_ratelimiter_lock.read().await; + let json_body_clone = json_body.clone(); + match private_rpcs .get_upstream_servers(1, self.block_watcher.clone()) .await @@ -125,11 +128,18 @@ impl Web3ProxyApp { let clone = self.clone(); let connections = private_rpcs.clone_connections(); - let json_body = json_body.clone(); + + // check incoming_id before sending any requests + let incoming_id = json_body.as_object().unwrap().get("id").unwrap(); tokio::spawn(async move { clone - .try_send_requests(upstream_servers, connections, json_body, tx) + .try_send_requests( + upstream_servers, + connections, + json_body_clone, + tx, + ) .await }); @@ -138,7 +148,12 @@ impl Web3ProxyApp { .await .ok_or_else(|| anyhow::anyhow!("no successful response"))?; - if let Ok(response) = response { + if let Ok(partial_response) = response { + let response = json!({ + "jsonrpc": "2.0", + "id": incoming_id, + "result": partial_response + }); return Ok(warp::reply::json(&response)); } } @@ -165,36 +180,51 @@ impl Web3ProxyApp { // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again let mut earliest_not_until = None; + // check incoming_id before sending any requests + let incoming_id = json_body.as_object().unwrap().get("id").unwrap(); + for balanced_rpcs in self.balanced_rpc_tiers.iter() { match balanced_rpcs .next_upstream_server(1, self.block_watcher.clone()) .await { Ok(upstream_server) => { + // TODO: better type for this. right now its request (the full jsonrpc object), response (just the inner result) let (tx, mut rx) = mpsc::unbounded_channel::>(); - let clone = self.clone(); - let connections = balanced_rpcs.clone_connections(); - let json_body = json_body.clone(); + { + // clone things so we can move them into the future and still use them here + let clone = self.clone(); + let connections = balanced_rpcs.clone_connections(); + let json_body = json_body.clone(); + let upstream_server = upstream_server.clone(); - tokio::spawn(async move { - clone - .try_send_requests( - vec![upstream_server], - connections, - json_body, - tx, - ) - .await - }); + tokio::spawn(async move { + clone + .try_send_requests( + vec![upstream_server], + connections, + json_body, + tx, + ) + .await + }); + } let response = rx .recv() .await .ok_or_else(|| anyhow::anyhow!("no successful response"))?; - if let Ok(response) = response { + if let Ok(partial_response) = response { + info!("forwarding request from {}", upstream_server); + + let response = json!({ + "jsonrpc": "2.0", + "id": incoming_id, + "result": partial_response + }); return Ok(warp::reply::json(&response)); } } @@ -240,13 +270,10 @@ impl Web3ProxyApp { rpc_servers: Vec, connections: Arc, json_request_body: serde_json::Value, + // TODO: better type for this tx: mpsc::UnboundedSender>, ) -> anyhow::Result<()> { // {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1} - let incoming_id = json_request_body - .get("id") - .ok_or_else(|| anyhow::anyhow!("bad id"))? - .to_owned(); let method = json_request_body .get("method") .and_then(|x| x.as_str()) @@ -259,7 +286,6 @@ impl Web3ProxyApp { // send the query to all the servers let bodies = future::join_all(rpc_servers.into_iter().map(|rpc| { - let incoming_id = incoming_id.clone(); let connections = connections.clone(); let method = method.clone(); let params = params.clone(); @@ -273,15 +299,10 @@ impl Web3ProxyApp { connections.get_mut(&rpc).unwrap().dec_active_requests(); - let mut response = response?; + let response = response?; // TODO: if "no block with that header" or some other jsonrpc errors, skip this response - // replace the id with what we originally received - if let Some(response_id) = response.get_mut("id") { - *response_id = incoming_id; - } - // send the first good response to a one shot channel. that way we respond quickly // drop the result because errors are expected after the first send let _ = tx.send(Ok(response));