diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6ca7138a..5a3f7d84 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1079,7 +1079,6 @@ impl App { if self.protected_rpcs.is_empty() { self.balanced_rpcs.request_with_metadata(web3_request).await } else { - todo!("its not empty") self.protected_rpcs .request_with_metadata(web3_request) .await diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index a13ab420..f69e8896 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -466,6 +466,7 @@ impl CacheMode { cache_errors: false, }) } + "eth_sendRawTransaction" => Ok(CacheMode::Never), "net_listening" => Ok(CacheMode::SuccessForever), "net_version" => Ok(CacheMode::SuccessForever), method => match get_block_param_id(method) { @@ -495,7 +496,7 @@ impl CacheMode { match self { Self::SuccessForever => None, Self::Never => None, - Self::Standard { block, .. } => Some(block), + Self::Standard { .. } => None, Self::Range { from_block, .. } => Some(from_block), } } diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 02d729c6..81ca75c9 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -269,16 +269,7 @@ impl Web3ProxyError { ) } Self::Arc(err) => { - // recurse somehow. Web3ProxyError isn't clone and we can't moe out of it - ( - StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcErrorData { - // TODO: is it safe to expose all of our anyhow strings? - message: "INTERNAL SERVER ERROR".into(), - code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), - data: Some(serde_json::Value::String(err.to_string())), - }, - ) + return err.as_response_parts(); } Self::BadRequest(err) => { trace!(?err, "BAD_REQUEST"); diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index 2bfb3637..d95d8f0a 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -309,7 +309,7 @@ impl ValidatedRequest { app: Option<&App>, authorization: Arc, chain_id: u64, - head_block: Option, + mut head_block: Option, kafka_debug_logger: Option>, max_wait: Option, permit: Option, @@ -331,6 +331,12 @@ impl ValidatedRequest { kafka_debug_logger.log_debug_request(&request); } + if head_block.is_none() { + if let Some(app) = app { + head_block = app.head_block_receiver().borrow().clone(); + } + } + // now that kafka has logged the user's original params, we can calculate the cache key // TODO: modify CacheMode::new to wait for a future block if one is requested! be sure to update head_block too! @@ -589,7 +595,7 @@ impl Drop for ValidatedRequest { // turn `&mut self` into `self` let x = mem::take(self); - trace!(?x, "request metadata dropped without stat send"); + // trace!(?x, "request metadata dropped without stat send"); let _ = x.try_send_stat(); } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 081408b1..4d850141 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -23,7 +23,7 @@ use std::time::Duration; use tokio::select; use tokio::task::yield_now; use tokio::time::{sleep_until, Instant}; -use tracing::{debug, enabled, error, info, trace, warn, Level}; +use tracing::{debug, enabled, error, info, instrument, trace, warn, Level}; #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { @@ -89,12 +89,13 @@ pub struct RankedRpcs { pub num_synced: usize, pub backups_needed: bool, - inner: Vec>, + pub(crate) inner: Vec>, sort_mode: SortMethod, } // TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated +#[derive(Debug)] pub struct RpcsForRequest { inner: Vec>, outer: Vec>, @@ -106,7 +107,8 @@ impl RankedRpcs { // we don't need to sort the rpcs now. we will sort them when a request neds them // TODO: the shame about this is that we lose just being able to compare 2 random servers - let head_block = head_block?; + // TODO: why is head_block not set here?! it should always be set! + let head_block = head_block.unwrap_or_default(); let backups_needed = rpcs.iter().any(|x| x.backup); @@ -219,6 +221,7 @@ impl RankedRpcs { // TODO: max lag was already handled for rpc in self.inner.iter().cloned() { + // if web3_request.head_block.is_some() { if let Some(block_needed) = min_block_needed { if !rpc.has_block_data(block_needed) { outer_for_request.push(rpc); @@ -231,6 +234,7 @@ impl RankedRpcs { continue; } } + // } inner_for_request.push(rpc); } @@ -421,7 +425,7 @@ impl ConsensusFinder { match old_ranked_rpcs.as_ref() { None => { info!( - "first {}/{} {}{}/{}/{} block={}, rpc={}", + "first {}/{} {}{}/{}/{} block={:?}, rpc={}", best_tier, worst_tier, backups_voted_str, @@ -870,6 +874,7 @@ fn best_rpc<'a>(rpc_a: &'a Arc, rpc_b: &'a Arc) -> &'a Arc impl Stream { stream! { trace!("entered stream"); @@ -919,9 +924,8 @@ impl RpcsForRequest { trace!("No request handle for {}. err={:?}", best_rpc, err); } } - - yield_now().await; } + yield_now().await; } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 76ab9d16..b7d7fa89 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -403,17 +403,18 @@ impl Web3Rpcs { ranked_rpcs } else if self.watch_head_block.is_some() { // if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong + trace!("watch_head_block is some"); return Err(Web3ProxyError::NoServersSynced); } else { + trace!("watch_head_block is none"); + // no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar - // TODO: return a future that resolves once we do have something? let rpcs = self.by_name.read().values().cloned().collect(); if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) { Arc::new(x) } else { // i doubt we will ever get here - // TODO: return a future that resolves once we do have something? return Err(Web3ProxyError::NoServersSynced); } }; @@ -611,7 +612,13 @@ impl Serialize for Web3Rpcs { // TODO: rename synced_connections to consensus_rpcs if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { - state.serialize_field("synced_connections", consensus_rpcs)?; + let names: Vec<_> = consensus_rpcs + .inner + .iter() + .map(|x| x.name.as_str()) + .collect(); + + state.serialize_field("synced_connections", &names)?; } else { state.serialize_field("synced_connections", &None::<()>)?; } diff --git a/web3_proxy_cli/src/test_utils/app.rs b/web3_proxy_cli/src/test_utils/app.rs index 3294e84a..68787f25 100644 --- a/web3_proxy_cli/src/test_utils/app.rs +++ b/web3_proxy_cli/src/test_utils/app.rs @@ -106,7 +106,14 @@ impl TestApp { }, )]), // influxdb_client: influx.map(|x| x.client), - private_rpcs: Default::default(), + private_rpcs: HashMap::from([( + "anvil_private".to_string(), + Web3RpcConfig { + http_url: Some(anvil.instance.endpoint()), + ws_url: Some(anvil.instance.ws_endpoint()), + ..Default::default() + }, + )]), bundler_4337_rpcs: Default::default(), extra: Default::default(), }; diff --git a/web3_proxy_cli/tests/test_proxy.rs b/web3_proxy_cli/tests/test_proxy.rs index 6acf0d9f..858dbe8b 100644 --- a/web3_proxy_cli/tests/test_proxy.rs +++ b/web3_proxy_cli/tests/test_proxy.rs @@ -120,6 +120,8 @@ async fn it_matches_anvil() { let x = TestApp::spawn(&a, None, None, None).await; + let proxy_provider = Http::from_str(x.proxy_provider.url().as_str()).unwrap(); + let weighted_anvil_provider = WeightedProvider::new(Http::from_str(&a.instance.endpoint()).unwrap()); let weighted_proxy_provider = @@ -201,8 +203,7 @@ async fn it_matches_anvil() { // fund singleton deployer // TODO: send through the quorum provider. it should detect that its already confirmed - let fund_tx_hash: H256 = a - .provider + let fund_tx_hash: H256 = proxy_provider .request("eth_sendRawTransaction", [raw_tx]) .await .unwrap(); @@ -210,7 +211,7 @@ async fn it_matches_anvil() { // deploy singleton deployer // TODO: send through the quorum provider. it should detect that its already confirmed - let deploy_tx: H256 = a.provider.request("eth_sendRawTransaction", ["0xf9016c8085174876e8008303c4d88080b90154608060405234801561001057600080fd5b50610134806100206000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c80634af63f0214602d575b600080fd5b60cf60048036036040811015604157600080fd5b810190602081018135640100000000811115605b57600080fd5b820183602082011115606c57600080fd5b80359060200191846001830284011164010000000083111715608d57600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600092019190915250929550509135925060eb915050565b604080516001600160a01b039092168252519081900360200190f35b6000818351602085016000f5939250505056fea26469706673582212206b44f8a82cb6b156bfcc3dc6aadd6df4eefd204bc928a4397fd15dacf6d5320564736f6c634300060200331b83247000822470"]).await.unwrap(); + let deploy_tx: H256 = proxy_provider.request("eth_sendRawTransaction", ["0xf9016c8085174876e8008303c4d88080b90154608060405234801561001057600080fd5b50610134806100206000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c80634af63f0214602d575b600080fd5b60cf60048036036040811015604157600080fd5b810190602081018135640100000000811115605b57600080fd5b820183602082011115606c57600080fd5b80359060200191846001830284011164010000000083111715608d57600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600092019190915250929550509135925060eb915050565b604080516001600160a01b039092168252519081900360200190f35b6000818351602085016000f5939250505056fea26469706673582212206b44f8a82cb6b156bfcc3dc6aadd6df4eefd204bc928a4397fd15dacf6d5320564736f6c634300060200331b83247000822470"]).await.unwrap(); assert_eq!( deploy_tx, "0x803351deb6d745e91545a6a3e1c0ea3e9a6a02a1a4193b70edfcd2f40f71a01c"