diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index f1e4375b..9bf53159 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -74,7 +74,7 @@ pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers - pub private_rpcs: Arc, + pub private_rpcs: Option>, response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? @@ -280,7 +280,7 @@ impl Web3ProxyApp { let private_rpcs = if private_rpcs.is_empty() { // TODO: do None instead of clone? warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); - balanced_rpcs.clone() + None } else { // TODO: attach context to this error let (private_rpcs, private_handle) = Web3Connections::spawn( @@ -304,7 +304,7 @@ impl Web3ProxyApp { handles.push(private_handle); - private_rpcs + Some(private_rpcs) }; let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| { @@ -762,8 +762,9 @@ impl Web3ProxyApp { // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { // emit stats - return self - .private_rpcs + let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); + + return rpcs .try_send_all_upstream_servers(request, None) .instrument(span) .await; @@ -849,27 +850,11 @@ impl Web3ProxyApp { let mut response = self .response_cache .try_get_with(cache_key, async move { - match method { - "temporarily disabled" => { - // "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { - // TODO: try_send_all serially with retries instead of parallel - self.private_rpcs - .try_send_all_upstream_servers( - request, - Some(&request_block_id.num), - ) - .await - } - _ => { - // TODO: retry some failures automatically! - self.balanced_rpcs - .try_send_best_upstream_server( - request, - Some(&request_block_id.num), - ) - .await - } - } + // TODO: retry some failures automatically! + // TODO: try private_rpcs if all the balanced_rpcs fail! + self.balanced_rpcs + .try_send_best_upstream_server(request, Some(&request_block_id.num)) + .await }) .await .unwrap(); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index d71b626f..ef7cbf02 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -120,7 +120,7 @@ impl Web3Connections { Some(rpc) => { rpc.wait_for_request_handle() .await? - .request("eth_getBlockByHash", get_block_params) + .request("eth_getBlockByHash", get_block_params, false) .await? } None => { @@ -140,7 +140,8 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByHash, so it should have all fields - self.save_block(&block, true).await?; + // TODO: fill in heaviest_chain! + self.save_block(&block, false).await?; Ok(block) } @@ -256,6 +257,7 @@ impl Web3Connections { None } else { // we don't know if its on the heaviest chain yet + debug!(?rpc_head_hash, ?rpc_head_num, %rpc.name, "saving"); self.save_block(&rpc_head_block, false).await?; connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); @@ -290,7 +292,7 @@ impl Web3Connections { let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) { x } else { - // TODO: why does this happen? + // TODO: why does this happen?!?! maybe we should do get_with? warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes"); continue; }; @@ -401,6 +403,8 @@ impl Web3Connections { // i think "conns" is somehow getting dupes trace!(?heavy_rpcs); + // TODO: if maybe_head_block.time() is old, ignore it + // success! this block has enough soft limit and nodes on it (or on later blocks) let conns: Vec> = heavy_rpcs .into_iter() diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 3135f057..ffaa1a2a 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -114,7 +114,7 @@ impl Web3Connection { let found_chain_id: Result = new_connection .wait_for_request_handle() .await? - .request("eth_chainId", Option::None::<()>) + .request("eth_chainId", Option::None::<()>, false) .await; match found_chain_id { @@ -206,6 +206,7 @@ impl Web3Connection { "0xdead00000000000000000000000000000000beef", maybe_archive_block, ), + true, ) .await; @@ -295,31 +296,40 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn reconnect( self: &Arc, + // websocket doesn't need the http client block_sender: Option<&flume::Sender>, ) -> anyhow::Result<()> { - // TODO: no-op if this called on a http provider - // websocket doesn't need the http client - info!(rpc=%self, "connecting"); - // since this lock is held open over an await, we use tokio's locking // TODO: timeout on this lock. if its slow, something is wrong let mut provider = self.provider.write().await; - // our provider doesn't work anymore - *provider = None; + if provider.is_some() { + if self.http_client.is_some() { + // http clients don't need to do anything for reconnecting + // they *do* need to run this function to setup the first + return Ok(()); + } - // reset sync status - { - let mut head_block_id = self.head_block_id.write(); - *head_block_id = None; - } + info!(rpc=%self, "reconnecting"); - // tell the block subscriber that we don't have any blocks - if let Some(block_sender) = &block_sender { - block_sender - .send_async((None, self.clone())) - .await - .context("block_sender during connect")?; + // disconnect the current provider + *provider = None; + + // reset sync status + { + let mut head_block_id = self.head_block_id.write(); + *head_block_id = None; + } + + // tell the block subscriber that we don't have any blocks + if let Some(block_sender) = &block_sender { + block_sender + .send_async((None, self.clone())) + .await + .context("block_sender during connect")?; + } + } else { + info!(rpc=%self, "connecting"); } // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! @@ -381,7 +391,7 @@ impl Web3Connection { let complete_head_block: Block = self .wait_for_request_handle() .await? - .request("eth_getBlockByHash", (new_hash, false)) + .request("eth_getBlockByHash", (new_hash, false), false) .await?; debug_assert!(complete_head_block.total_difficulty.is_some()); @@ -549,7 +559,7 @@ impl Web3Connection { match self.wait_for_request_handle().await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle - .request("eth_getBlockByNumber", ("latest", false)) + .request("eth_getBlockByNumber", ("latest", false), false) .await; match block { @@ -619,7 +629,7 @@ impl Web3Connection { let block: Result, _> = self .wait_for_request_handle() .await? - .request("eth_getBlockByNumber", ("latest", false)) + .request("eth_getBlockByNumber", ("latest", false), false) .await .map(|x| Some(Arc::new(x))); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index a8e6b7fb..24a8a4be 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -305,7 +305,7 @@ impl Web3Connections { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = - active_request_handle.request(method, params).await; + active_request_handle.request(method, params, false).await; result }) .collect::>() @@ -500,7 +500,7 @@ impl Web3Connections { skip_rpcs.push(active_request_handle.clone_connection()); let response_result = active_request_handle - .request(&request.method, &request.params) + .request(&request.method, &request.params, false) .await; match JsonRpcForwardedResponse::try_from_response_result( diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7fe56465..f2a7c93b 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -64,12 +64,14 @@ impl OpenRequestHandle { /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// By taking self here, we ensure that this is dropped after the request is complete. /// TODO: we no longer take self because metered doesn't like that + /// TODO: ErrorCount includes too many types of errors, such as transaction reverts #[instrument(skip_all)] #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])] pub async fn request( &self, method: &str, params: T, + silent_errors: bool, ) -> Result where T: fmt::Debug + serde::Serialize + Send + Sync, @@ -107,7 +109,10 @@ impl OpenRequestHandle { // TODO: i think ethers already has trace logging (and does it much more fancy) if let Err(err) = &response { - warn!(?err, %method, rpc=%self.conn, "bad response!"); + if !silent_errors { + // TODO: this isn't always bad. missing trie node while we are checking initial + warn!(?err, %method, rpc=%self.conn, "bad response!"); + } } else { // TODO: opt-in response inspection to log reverts with their request. put into redis or what? // trace!(rpc=%self.0, %method, ?response); diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 481d1d72..2988bc48 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -27,7 +27,7 @@ impl Web3Connections { let tx: Transaction = match rpc.try_request_handle().await { Ok(OpenRequestResult::Handle(handle)) => { handle - .request("eth_getTransactionByHash", (pending_tx_id,)) + .request("eth_getTransactionByHash", (pending_tx_id,), false) .await? } Ok(_) => {