add bundler_4337_rpcs

This commit is contained in:
Bryan Stitt 2023-04-14 00:04:35 -07:00
parent d035049c8f
commit f3435bc6e0
3 changed files with 107 additions and 5 deletions

@ -202,6 +202,8 @@ impl DatabaseReplica {
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
/// Send 4337 Abstraction Bundler requests to one of these servers
pub bundler_4337_rpcs: Option<Arc<Web3Rpcs>>,
pub http_client: Option<reqwest::Client>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Option<Arc<Web3Rpcs>>,
@ -695,11 +697,46 @@ impl Web3ProxyApp {
Some(private_rpcs)
};
let hostname = hostname::get().ok().and_then(|x| x.to_str().map(|x| x.to_string()));
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections
// only some chains have this, so this is optional
let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() {
warn!("No bundler_4337_rpcs configured");
None
} else {
// TODO: do something with the spawn handle
let (bundler_4337_rpcs, bundler_4337_rpcs_handle) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
None,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
)
.await
.context("spawning bundler_4337_rpcs")?;
app_handles.push(bundler_4337_rpcs_handle);
Some(bundler_4337_rpcs)
};
let hostname = hostname::get()
.ok()
.and_then(|x| x.to_str().map(|x| x.to_string()));
let app = Self {
config: top_config.app.clone(),
balanced_rpcs,
bundler_4337_rpcs,
http_client,
kafka_producer,
private_rpcs,
@ -778,6 +815,17 @@ impl Web3ProxyApp {
}
}
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs {
if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
.await?;
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
}
}
Ok(())
}
@ -1074,6 +1122,7 @@ impl Web3ProxyApp {
}
// #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
async fn proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -1250,6 +1299,59 @@ impl Web3ProxyApp {
vec![],
));
}
method @ ("debug_bundler_sendBundleNow"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool") => {
return Ok((
JsonRpcForwardedResponse::from_string(
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format!("method unsupported: {}", method),
None,
Some(request_id),
),
vec![],
));
}
_method @ ("eth_sendUserOperation"
| "eth_estimateUserOperationGas"
| "eth_getUserOperationByHash"
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
let response = bundler_4337_rpcs
.try_proxy_connection(
authorization,
request,
Some(&request_metadata),
None,
None,
)
.await?;
// TODO: DRY
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_method,
authorization.clone(),
request_metadata,
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.context("stat_sender sending bundler_4337 response stat")?;
}
return Ok((response, rpcs));
}
None => {
// TODO: stats!
return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into());
}
},
// some commands can use local data or caches
"eth_accounts" => {
// no stats on this. its cheap
@ -1297,6 +1399,8 @@ impl Web3ProxyApp {
// i think this is always an error response
let rpcs = request_metadata.backend_requests.lock().clone();
// TODO! save stats
return Ok((response, rpcs));
};
@ -1492,7 +1596,6 @@ impl Web3ProxyApp {
serde_json::Value::String(APP_USER_AGENT.to_string())
}
"web3_sha3" => {
// emit stats
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
Some(serde_json::Value::Array(params)) => {
@ -1558,8 +1661,6 @@ impl Web3ProxyApp {
return Err(FrontendErrorResponse::AccessDenied);
}
// emit stats
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())

@ -316,6 +316,7 @@ mod tests {
),
]),
private_rpcs: None,
bundler_4337_rpcs: None,
extra: Default::default(),
};

@ -42,8 +42,8 @@ pub struct CliConfig {
pub struct TopConfig {
pub app: AppConfig,
pub balanced_rpcs: HashMap<String, Web3RpcConfig>,
// TODO: instead of an option, give it a default
pub private_rpcs: Option<HashMap<String, Web3RpcConfig>>,
pub bundler_4337_rpcs: Option<HashMap<String, Web3RpcConfig>>,
/// unknown config options get put here
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,