diff --git a/TODO.md b/TODO.md index b8504b7b..f9e14321 100644 --- a/TODO.md +++ b/TODO.md @@ -30,18 +30,17 @@ - even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens. - running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding - fixed by https://github.com/gakonst/ethers-rs/pull/1287 -- [ ] when sending with private relays, brownie's tx.wait can think the transaction was dropped. smarter retry on eth_getTransactionByHash and eth_getTransactionReceipt (maybe only if we sent the transaction ourselves) -- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more -- [ ] if web3 proxy gets an http error back, retry another node -- [ ] endpoint for health checks. if no synced servers, give a 502 error -- [ ] interval for http subscriptions should be based on block time. -- [ ] todo: include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load +- [x] when sending with private relays, brownie's tx.wait can think the transaction was dropped. smarter retry on eth_getTransactionByHash and eth_getTransactionReceipt (maybe only if we sent the transaction ourselves) +- [x] if web3 proxy gets an http error back, retry another node +- [x] endpoint for health checks. if no synced servers, give a 502 error - [ ] refactor so configs can change while running - create the app without applying any config to it - have a blocking future watching the config file and calling app.apply_config() on first load and on change +- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more (might already be fixed) ## V1 +- [ ] interval for http subscriptions should be based on block time. load from config is easy, but - [ ] some things that are cached locally should probably be in shared redis caches - [ ] stats when forks are resolved (and what chain they were on?) - [ ] incoming rate limiting (by api key) @@ -84,4 +83,5 @@ - [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes? - [ ] subscription id should be per connection, not global - [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master) -- [ ] subscribe to pending transactions and build an intelligent gas estimator \ No newline at end of file +- [ ] subscribe to pending transactions and build an intelligent gas estimator +- [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index f7b54159..272ff9a0 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -566,6 +566,10 @@ impl ActiveRequestHandle { Self(connection) } + pub fn clone_connection(&self) -> Arc { + self.0.clone() + } + /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 026f16c9..bae3271f 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -550,7 +550,7 @@ impl Web3Connections { if synced_connections.inner.len() == total_rpcs { // TODO: more metrics - debug!("all head: {}", new_block_hash); + trace!("all head: {}", new_block_hash); } trace!( @@ -576,7 +576,6 @@ impl Web3Connections { } // TODO: only publish if there are x (default 2) nodes synced to this block? - // do the arcswap // TODO: do this before or after processing all the transactions in this block? self.synced_connections.swap(synced_connections); } @@ -589,7 +588,10 @@ impl Web3Connections { /// get the best available rpc server #[instrument(skip_all)] - pub async fn next_upstream_server(&self) -> Result> { + pub async fn next_upstream_server( + &self, + skip: &[Arc], + ) -> Result> { let mut earliest_retry_after = None; let mut synced_rpcs: Vec> = self @@ -597,9 +599,14 @@ impl Web3Connections { .load() .inner .iter() + .filter(|x| !skip.contains(x)) .cloned() .collect(); + if synced_rpcs.is_empty() { + return Err(None); + } + let sort_cache: HashMap, (f32, u32)> = synced_rpcs .iter() .map(|rpc| { @@ -657,8 +664,8 @@ impl Web3Connections { // check rate limits and increment our connection counter match connection.try_request_handle().await { Err(retry_after) => { - earliest_retry_after = earliest_retry_after.min(Some(retry_after)); // this rpc is not available. skip it + earliest_retry_after = earliest_retry_after.min(Some(retry_after)); } Ok(handle) => selected_rpcs.push(handle), } @@ -677,23 +684,44 @@ impl Web3Connections { &self, request: JsonRpcRequest, ) -> anyhow::Result { + let mut skip_rpcs = vec![]; + + // TODO: maximum retries? loop { - match self.next_upstream_server().await { + if skip_rpcs.len() == self.inner.len() { + break; + } + match self.next_upstream_server(&skip_rpcs).await { Ok(active_request_handle) => { + // save the rpc in case we get an error and want to retry on another server + skip_rpcs.push(active_request_handle.clone_connection()); + let response_result = active_request_handle .request(&request.method, &request.params) .await; - match JsonRpcForwardedResponse::from_response_result( + match JsonRpcForwardedResponse::try_from_response_result( response_result, request.id.clone(), ) { Ok(response) => { - if response.error.is_some() { - trace!(?response, "Sending error reply",); - // errors already sent false to the in_flight_tx + if let Some(error) = &response.error { + trace!(?response, "rpc error"); + + // some errors should be retried + if error.code == -32000 + && [ + "header not found", + "header for hash not found", + "node not started", + "RPC timeout", + ] + .contains(&error.message.as_str()) + { + continue; + } } else { - trace!(?response, "Sending reply"); + trace!(?response, "rpc success"); } return Ok(response); @@ -714,8 +742,7 @@ impl Web3Connections { Err(None) => { warn!(?self, "No servers in sync!"); - // TODO: sleep how long? until synced_connections changes or rate limits are available - // TODO: subscribe to head_block_sender + // TODO: subscribe to something on synced connections. maybe it should just be a watch channel sleep(Duration::from_millis(200)).await; continue; @@ -733,13 +760,15 @@ impl Web3Connections { } } } + + Err(anyhow::anyhow!("all retries exhausted")) } + /// be sure there is a timeout on this or it might loop forever pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, ) -> anyhow::Result { - // TODO: timeout on this loop loop { match self.get_upstream_servers().await { Ok(active_request_handles) => { diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index e0d82d49..1942bac1 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -181,7 +181,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_response_result( + pub fn try_from_response_result( result: Result, ProviderError>, id: Box, ) -> anyhow::Result {