diff --git a/.gitignore b/.gitignore index ea8c4bf7..686bbfe6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ /target +/perf.data +/perf.data.old +/flamegraph.svg diff --git a/README.md b/README.md index 1ac94701..0ee0a164 100644 --- a/README.md +++ b/README.md @@ -28,22 +28,31 @@ cargo run -r -- --eth-primary-rpc "https://your.favorite.provider" curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","params":[],"id":67}' 127.0.0.1:8845/eth ``` +## Flame Graphs + + $ cat /proc/sys/kernel/kptr_restrict + 1 + $ echo 0 |sudo tee /proc/sys/kernel/kptr_restrict + 0 + $ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph + + ## Load Testing Test the proxy: - wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 - wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 + wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 + wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445 Test geth: - wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 - wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 Test erigon: - wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 - wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 ## Todo diff --git a/src/main.rs b/src/main.rs index fa8b3c8e..7dd16d17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ static APP_USER_AGENT: &str = concat!( ); /// The application +// TODO: this debug impl is way too verbose. make something smaller #[derive(Debug)] struct Web3ProxyApp { /// clock used for rate limiting @@ -207,12 +208,15 @@ impl Web3ProxyApp { // sleep (with a lock) until our rate limits should be available drop(read_lock); - let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; + if let Some(not_until) = not_until { + let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; - let deadline = not_until.wait_time_from(self.clock.now()); - sleep(deadline).await; + let deadline = not_until.wait_time_from(self.clock.now()); - drop(write_lock); + sleep(deadline).await; + + drop(write_lock); + } } }; } @@ -260,25 +264,41 @@ impl Web3ProxyApp { .await .ok_or_else(|| anyhow::anyhow!("no successful response"))?; - if let Ok(partial_response) = response { - // TODO: trace - // info!("forwarding request from {}", upstream_server); + let response = match response { + Ok(partial_response) => { + // TODO: trace + // info!("forwarding request from {}", upstream_server); - let response = json!({ - "jsonrpc": "2.0", - "id": incoming_id, - "result": partial_response - }); - return Ok(warp::reply::json(&response)); - } + json!({ + "jsonrpc": "2.0", + "id": incoming_id, + "result": partial_response + }) + } + Err(e) => { + // TODO: what is the proper format for an error? + // TODO: use e + json!({ + "jsonrpc": "2.0", + "id": incoming_id, + "error": format!("{}", e) + }) + } + }; + + return Ok(warp::reply::json(&response)); } - Err(not_until) => { + Err(None) => { + warn!("No servers in sync!"); + } + Err(Some(not_until)) => { // save the smallest not_until. if nothing succeeds, return an Err with not_until in it if earliest_not_until.is_none() { - earliest_not_until = Some(not_until); + earliest_not_until.replace(not_until); } else { let earliest_possible = earliest_not_until.as_ref().unwrap().earliest_possible(); + let new_earliest_possible = not_until.earliest_possible(); if earliest_possible > new_earliest_possible { @@ -292,23 +312,26 @@ impl Web3ProxyApp { // we haven't returned an Ok, sleep and try again // TODO: move this to a helper function drop(read_lock); - let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; // unwrap should be safe since we would have returned if it wasn't set - let deadline = if let Some(earliest_not_until) = earliest_not_until { - earliest_not_until.wait_time_from(self.clock.now()) + if let Some(earliest_not_until) = earliest_not_until { + let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; + + let deadline = earliest_not_until.wait_time_from(self.clock.now()); + + sleep(deadline).await; + + drop(write_lock); } else { - // TODO: exponential backoff? - Duration::from_secs(1) + // TODO: how long should we wait? + // TODO: max wait time? + sleep(Duration::from_millis(500)).await; }; - - sleep(deadline).await; - - drop(write_lock); } } } + #[instrument] async fn try_send_requests( &self, rpc_servers: Vec, @@ -404,22 +427,22 @@ async fn main() { vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], // paid nodes // TODO: add paid nodes (with rate limits) - vec![ - // chainstack.com archive - ( - "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7", - 0, - ), - ], + // vec![ + // // chainstack.com archive + // ( + // "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7", + // 0, + // ), + // ], // free nodes - vec![ - // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline - ("https://rpc.ankr.com/eth", 0), - ], + // vec![ + // // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline + // ("https://rpc.ankr.com/eth", 0), + // ], ], vec![ - ("https://api.edennetwork.io/v1/beta", 0), - ("https://api.edennetwork.io/v1/", 0), + // ("https://api.edennetwork.io/v1/beta", 0), + // ("https://api.edennetwork.io/v1/", 0), ], ) .await @@ -433,9 +456,14 @@ async fn main() { .then(move |json_body| state.clone().proxy_web3_rpc(json_body)) .map(handle_anyhow_errors); - warp::serve(proxy_rpc_filter) - .run(([0, 0, 0, 0], listen_port)) - .await; + // TODO: filter for displaying connections + // TODO: filter for displaying + + // TODO: warp trace is super verbose. how do we make this more readable? + // let routes = proxy_rpc_filter.with(warp::trace::request()); + let routes = proxy_rpc_filter; + + warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; } /// convert result into an http response. use this at the end of your warp filter diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index aaebbc3c..4dfc98b1 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -161,7 +161,7 @@ impl Web3ProviderTier { /// get the best available rpc server #[instrument] - pub async fn next_upstream_server(&self) -> Result> { + pub async fn next_upstream_server(&self) -> Result>> { let mut earliest_not_until = None; for selected_rpc in self.synced_rpcs.load().iter() { @@ -203,16 +203,14 @@ impl Web3ProviderTier { return Ok(selected_rpc.clone()); } - // return the smallest not_until - if let Some(not_until) = earliest_not_until { - Err(not_until) - } else { - unimplemented!(); - } + // this might be None + Err(earliest_not_until) } /// get all available rpc servers - pub async fn get_upstream_servers(&self) -> Result, NotUntil> { + pub async fn get_upstream_servers( + &self, + ) -> Result, Option>> { let mut earliest_not_until = None; let mut selected_rpcs = vec![]; for selected_rpc in self.synced_rpcs.load().iter() { @@ -253,11 +251,7 @@ impl Web3ProviderTier { return Ok(selected_rpcs); } - // return the earliest not_until - if let Some(not_until) = earliest_not_until { - Err(not_until) - } else { - Ok(vec![]) - } + // return the earliest not_until (if no rpcs are synced, this will be None) + Err(earliest_not_until) } }