diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5ccdd5c6..c697f572 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -202,6 +202,8 @@ impl DatabaseReplica { pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, + /// Send 4337 Abstraction Bundler requests to one of these servers + pub bundler_4337_rpcs: Option>, pub http_client: Option, /// Send private requests (like eth_sendRawTransaction) to all these servers pub private_rpcs: Option>, @@ -695,11 +697,46 @@ impl Web3ProxyApp { Some(private_rpcs) }; - let hostname = hostname::get().ok().and_then(|x| x.to_str().map(|x| x.to_string())); + // prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections + // only some chains have this, so this is optional + let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() { + warn!("No bundler_4337_rpcs configured"); + None + } else { + // TODO: do something with the spawn handle + let (bundler_4337_rpcs, bundler_4337_rpcs_handle) = Web3Rpcs::spawn( + top_config.app.chain_id, + db_conn.clone(), + http_client.clone(), + // bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag + None, + None, + 0, + 0, + pending_transactions.clone(), + None, + // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs + // they also often have low rate limits + // however, they are well connected to miners/validators. so maybe using them as a safety check would be good + // TODO: but maybe we could include privates in the "backup" tier + None, + ) + .await + .context("spawning bundler_4337_rpcs")?; + + app_handles.push(bundler_4337_rpcs_handle); + + Some(bundler_4337_rpcs) + }; + + let hostname = hostname::get() + .ok() + .and_then(|x| x.to_str().map(|x| x.to_string())); let app = Self { config: top_config.app.clone(), balanced_rpcs, + bundler_4337_rpcs, http_client, kafka_producer, private_rpcs, @@ -778,6 +815,17 @@ impl Web3ProxyApp { } } + if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs { + if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() { + bundler_4337_rpcs + .apply_server_configs(self, bundler_4337_rpc_configs) + .await?; + } else { + // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None + todo!("handle toggling bundler_4337_rpcs") + } + } + Ok(()) } @@ -1074,6 +1122,7 @@ impl Web3ProxyApp { } // #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] + // TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved! async fn proxy_cached_request( self: &Arc, authorization: &Arc, @@ -1250,6 +1299,59 @@ impl Web3ProxyApp { vec![], )); } + method @ ("debug_bundler_sendBundleNow" + | "debug_bundler_clearState" + | "debug_bundler_dumpMempool") => { + return Ok(( + JsonRpcForwardedResponse::from_string( + // TODO: we should probably have some escaping on this. but maybe serde will protect us enough + format!("method unsupported: {}", method), + None, + Some(request_id), + ), + vec![], + )); + } + _method @ ("eth_sendUserOperation" + | "eth_estimateUserOperationGas" + | "eth_getUserOperationByHash" + | "eth_getUserOperationReceipt" + | "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() { + Some(bundler_4337_rpcs) => { + let response = bundler_4337_rpcs + .try_proxy_connection( + authorization, + request, + Some(&request_metadata), + None, + None, + ) + .await?; + + // TODO: DRY + let rpcs = request_metadata.backend_requests.lock().clone(); + + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + request_method, + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + + stat_sender + .send_async(response_stat.into()) + .await + .context("stat_sender sending bundler_4337 response stat")?; + } + + return Ok((response, rpcs)); + } + None => { + // TODO: stats! + return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into()); + } + }, // some commands can use local data or caches "eth_accounts" => { // no stats on this. its cheap @@ -1297,6 +1399,8 @@ impl Web3ProxyApp { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); + // TODO! save stats + return Ok((response, rpcs)); }; @@ -1492,7 +1596,6 @@ impl Web3ProxyApp { serde_json::Value::String(APP_USER_AGENT.to_string()) } "web3_sha3" => { - // emit stats // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { Some(serde_json::Value::Array(params)) => { @@ -1558,8 +1661,6 @@ impl Web3ProxyApp { return Err(FrontendErrorResponse::AccessDenied); } - // emit stats - // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 3c16dc59..b30ef1ef 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -316,6 +316,7 @@ mod tests { ), ]), private_rpcs: None, + bundler_4337_rpcs: None, extra: Default::default(), }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5501091c..40ec8828 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -42,8 +42,8 @@ pub struct CliConfig { pub struct TopConfig { pub app: AppConfig, pub balanced_rpcs: HashMap, - // TODO: instead of an option, give it a default pub private_rpcs: Option>, + pub bundler_4337_rpcs: Option>, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap,