Merge branch 'main' into devel

This commit is contained in:
Bryan Stitt 2023-04-18 20:14:12 -07:00
commit 4889c3e1ce
11 changed files with 343 additions and 172 deletions

2
Cargo.lock generated

@ -3148,7 +3148,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.17.0"
version = "0.19.0"
dependencies = [
"sea-orm-migration",
"tokio",

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.17.0"
version = "0.19.0"
edition = "2021"
publish = false

@ -209,6 +209,8 @@ impl DatabaseReplica {
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
/// Send 4337 Abstraction Bundler requests to one of these servers
pub bundler_4337_rpcs: Option<Arc<Web3Rpcs>>,
pub http_client: Option<reqwest::Client>,
/// application config
/// TODO: this will need a large refactor to handle reloads while running. maybe use a watch::Receiver?
@ -764,6 +766,34 @@ impl Web3ProxyApp {
Some(private_rpcs)
};
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections
// only some chains have this, so this is optional
let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() {
warn!("No bundler_4337_rpcs configured");
None
} else {
// TODO: do something with the spawn handle
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
None,
None,
)
.await
.context("spawning bundler_4337_rpcs")?;
app_handles.push(bundler_4337_rpcs_handle);
Some(bundler_4337_rpcs)
};
let hostname = hostname::get()
.ok()
.and_then(|x| x.to_str().map(|x| x.to_string()));
@ -771,6 +801,7 @@ impl Web3ProxyApp {
let app = Self {
config: top_config.app.clone(),
balanced_rpcs,
bundler_4337_rpcs,
http_client,
kafka_producer,
private_rpcs,
@ -850,19 +881,33 @@ impl Web3ProxyApp {
// connect to the backends
self.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs)
.await?;
.await
.context("updating balanced rpcs")?;
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
if let Some(private_rpcs) = self.private_rpcs.as_ref() {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
.await?;
.await
.context("updating private_rpcs")?;
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo!("handle toggling private_rpcs")
}
}
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs {
if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() {
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
.await
.context("updating bundler_4337_rpcs")?;
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
}
}
Ok(())
}
@ -1158,6 +1203,7 @@ impl Web3ProxyApp {
}
}
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
async fn proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -1335,6 +1381,59 @@ impl Web3ProxyApp {
vec![],
));
}
method @ ("debug_bundler_sendBundleNow"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool") => {
return Ok((
JsonRpcForwardedResponse::from_string(
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format!("method unsupported: {}", method),
None,
Some(request_id),
),
vec![],
));
}
method @ ("eth_sendUserOperation"
| "eth_estimateUserOperationGas"
| "eth_getUserOperationByHash"
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
let response = bundler_4337_rpcs
.try_proxy_connection(
authorization,
request,
Some(&request_metadata),
None,
None,
)
.await?;
// TODO: DRY
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(method.to_string()),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.map_err(Web3ProxyError::SendAppStatError)?;
}
return Ok((response, rpcs));
}
None => {
// TODO: stats!
return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into());
}
},
// some commands can use local data or caches
"eth_accounts" => {
// no stats on this. its cheap
@ -1379,6 +1478,8 @@ impl Web3ProxyApp {
// i think this is always an error response
let rpcs = request_metadata.backend_requests.lock().clone();
// TODO! save stats
return Ok((response, rpcs));
};
@ -1574,7 +1675,6 @@ impl Web3ProxyApp {
serde_json::Value::String(APP_USER_AGENT.to_string())
}
"web3_sha3" => {
// emit stats
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
Some(serde_json::Value::Array(params)) => {
@ -1641,8 +1741,6 @@ impl Web3ProxyApp {
return Err(Web3ProxyError::AccessDenied);
}
// emit stats
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())

@ -401,6 +401,7 @@ mod tests {
),
]),
private_rpcs: None,
bundler_4337_rpcs: None,
extra: Default::default(),
};

@ -42,12 +42,12 @@ impl RpcAccountingSubCommand {
#[derive(Serialize, FromQueryResult)]
struct SelectResult {
total_frontend_requests: Decimal,
// pub total_backend_retries: Decimal,
// pub total_cache_misses: Decimal,
total_backend_retries: Decimal,
// total_cache_misses: Decimal,
total_cache_hits: Decimal,
total_response_bytes: Decimal,
total_error_responses: Decimal,
// pub total_response_millis: Decimal,
total_response_millis: Decimal,
first_period_datetime: DateTimeUtc,
last_period_datetime: DateTimeUtc,
}
@ -58,10 +58,10 @@ impl RpcAccountingSubCommand {
rpc_accounting::Column::FrontendRequests.sum(),
"total_frontend_requests",
)
// .column_as(
// rpc_accounting::Column::BackendRequests.sum(),
// "total_backend_retries",
// )
.column_as(
rpc_accounting::Column::BackendRequests.sum(),
"total_backend_retries",
)
// .column_as(
// rpc_accounting::Column::CacheMisses.sum(),
// "total_cache_misses",
@ -76,10 +76,10 @@ impl RpcAccountingSubCommand {
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
// .column_as(
// rpc_accounting::Column::SumResponseMillis.sum(),
// "total_response_millis",
// )
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
)
.column_as(
rpc_accounting::Column::PeriodDatetime.min(),
"first_period_datetime",
@ -131,25 +131,42 @@ impl RpcAccountingSubCommand {
q = q.filter(condition);
// TODO: make this work without into_json. i think we need to make a struct
let query_response = q
let stats = q
.into_model::<SelectResult>()
.one(db_conn)
.await?
.context("no query result")?;
info!(
"query_response for chain {:?}: {:#}",
self.chain_id,
json!(query_response)
);
if let Some(chain_id) = self.chain_id {
info!("stats for chain {}", chain_id);
} else {
info!("stats for all chains");
}
// let query_seconds: Decimal = query_response
// .last_period_datetime
// .signed_duration_since(query_response.first_period_datetime)
// .num_seconds()
// .into();
// info!("query seconds: {}", query_seconds);
info!("stats: {:#}", json!(&stats));
let query_seconds: Decimal = stats
.last_period_datetime
.signed_duration_since(stats.first_period_datetime)
.num_seconds()
.into();
dbg!(query_seconds);
let avg_request_per_second = (stats.total_frontend_requests / query_seconds).round_dp(2);
dbg!(avg_request_per_second);
let cache_hit_rate = (stats.total_cache_hits / stats.total_frontend_requests
* Decimal::from(100))
.round_dp(2);
dbg!(cache_hit_rate);
let avg_response_millis =
(stats.total_response_millis / stats.total_frontend_requests).round_dp(3);
dbg!(avg_response_millis);
let avg_response_bytes =
(stats.total_response_bytes / stats.total_frontend_requests).round();
dbg!(avg_response_bytes);
Ok(())
}

@ -42,8 +42,8 @@ pub struct CliConfig {
pub struct TopConfig {
pub app: AppConfig,
pub balanced_rpcs: HashMap<String, Web3RpcConfig>,
// TODO: instead of an option, give it a default
pub private_rpcs: Option<HashMap<String, Web3RpcConfig>>,
pub bundler_4337_rpcs: Option<HashMap<String, Web3RpcConfig>>,
/// unknown config options get put here
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,

@ -164,6 +164,7 @@ pub async fn serve(
//
.route("/health", get(status::health))
.route("/status", get(status::status))
.route("/status/backups_needed", get(status::backups_needed))
//
// User stuff
//

@ -27,6 +27,30 @@ pub async fn health(
}
}
/// Easy alerting if backup servers are in use.
pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
let code = {
let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow();
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
if consensus_rpcs.backups_needed {
StatusCode::INTERNAL_SERVER_ERROR
} else {
StatusCode::OK
}
} else {
// if no consensus, we still "need backups". we just don't have any. which is worse
StatusCode::INTERNAL_SERVER_ERROR
}
};
if matches!(code, StatusCode::OK) {
(code, "no backups needed. :)")
} else {
(code, "backups needed! :(")
}
}
/// Very basic status page.
///
/// TODO: replace this with proper stats and monitoring

@ -18,12 +18,12 @@ use tokio::time::Instant;
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(super) tier: u64,
pub(super) head_block: Web3ProxyBlock,
pub(super) best_rpcs: Vec<Arc<Web3Rpc>>,
pub(crate) tier: u64,
pub(crate) head_block: Web3ProxyBlock,
pub(crate) best_rpcs: Vec<Arc<Web3Rpc>>,
// TODO: functions like "compare_backup_vote()"
// pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(super) backups_needed: bool,
pub(crate) backups_needed: bool,
}
impl ConsensusWeb3Rpcs {

@ -53,7 +53,7 @@ pub struct Web3Rpcs {
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
/// Geth's subscriptions have the same potential for skipping blocks.
pub(super) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
pub(crate) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) pending_transaction_cache:
@ -102,6 +102,8 @@ impl Web3Rpcs {
let expected_block_time_ms = match chain_id {
// ethereum
1 => 12_000,
// ethereum-goerli
5 => 12_000,
// polygon
137 => 2_000,
// fantom
@ -110,7 +112,10 @@ impl Web3Rpcs {
42161 => 500,
// anything else
_ => {
warn!("unexpected chain_id. polling every {} seconds", 10);
warn!(
"unexpected chain_id ({}). polling every {} seconds",
chain_id, 10
);
10_000
}
};
@ -214,11 +219,14 @@ impl Web3Rpcs {
) -> anyhow::Result<()> {
// safety checks
if rpc_configs.len() < app.config.min_synced_rpcs {
return Err(anyhow::anyhow!(
// TODO: don't count disabled servers!
// TODO: include if this is balanced, private, or 4337
warn!(
"Only {}/{} rpcs! Add more rpcs or reduce min_synced_rpcs.",
rpc_configs.len(),
app.config.min_synced_rpcs
));
);
return Ok(());
}
// safety check on sum soft limit
@ -502,141 +510,161 @@ impl Web3Rpcs {
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<OpenRequestResult> {
let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option<U64>), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
if self.watch_consensus_head_sender.is_none() {
// pick any server
let mut m = BTreeMap::new();
if synced_connections.is_none() {
return Ok(OpenRequestResult::NotReady);
}
let synced_connections =
synced_connections.expect("synced_connections can't be None here");
let key = (0, None);
let head_block_num = synced_connections.head_block.number();
let head_block_age = synced_connections.head_block.age();
// TODO: double check the logic on this. especially if only min is set
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
// no required block given. treat this like they requested the consensus head block
cmp::Ordering::Equal
}
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
(Some(min_block_needed), None) => min_block_needed.cmp(head_block_num),
(Some(min_block_needed), Some(max_block_needed)) => {
match min_block_needed.cmp(max_block_needed) {
cmp::Ordering::Less | cmp::Ordering::Equal => {
min_block_needed.cmp(head_block_num)
}
cmp::Ordering::Greater => {
// TODO: force a debug log of the original request to see if our logic is wrong?
// TODO: attach the rpc_key_id so we can find the user to ask if they need help
return Err(Web3ProxyError::InvalidBlockBounds {
min: min_block_needed.as_u64(),
max: max_block_needed.as_u64(),
});
}
for x in self.by_name.read().values() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
};
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
m
} else {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
// collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new();
match needed_blocks_comparison {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. ignore rpcs that are still syncing
trace!("old block needed");
let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x));
// TODO: cache this somehow?
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.by_name
.read()
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
#[allow(clippy::if_same_then_else)]
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.map(|max_block_needed| !x.has_block_data(max_block_needed))
.unwrap_or(false)
{
// server does not have the max block
trace!(
"{} does not have the max block ({:?})",
x,
max_block_needed
);
false
} else {
!min_block_needed
.map(|min_block_needed| !x.has_block_data(min_block_needed))
.unwrap_or(false)
}
})
.cloned()
{
let x_head_block = x.head_block.read().clone();
if let Some(x_head) = x_head_block {
// TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load
let x_head_num = x_head.number().min(head_block_num);
// TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num {
trace!("rpc is still syncing");
continue;
}
}
if let Some(min_block_age) = min_block_age {
if x_head.age() > min_block_age {
// rpc is still syncing
trace!("server's block is too old");
continue;
}
}
let key = (x.tier, Some(*x_head_num));
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
}
cmp::Ordering::Equal => {
// using the consensus head block. filter the synced rpcs
// the key doesn't matter if we are checking synced connections
// they are all at the same block and it is already sized to what we need
let key = (0, None);
for x in synced_connections.best_rpcs.iter() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe()
if synced_connections.is_none() {
return Ok(OpenRequestResult::NotReady);
}
}
let synced_connections =
synced_connections.expect("synced_connections can't be None here");
m
let head_block_num = synced_connections.head_block.number();
let head_block_age = synced_connections.head_block.age();
// TODO: double check the logic on this. especially if only min is set
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
// no required block given. treat this like they requested the consensus head block
cmp::Ordering::Equal
}
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
(Some(min_block_needed), None) => min_block_needed.cmp(head_block_num),
(Some(min_block_needed), Some(max_block_needed)) => {
match min_block_needed.cmp(max_block_needed) {
cmp::Ordering::Less | cmp::Ordering::Equal => {
min_block_needed.cmp(head_block_num)
}
cmp::Ordering::Greater => {
// TODO: force a debug log of the original request to see if our logic is wrong?
// TODO: attach the rpc_key_id so we can find the user to ask if they need help
return Err(Web3ProxyError::InvalidBlockBounds {
min: min_block_needed.as_u64(),
max: max_block_needed.as_u64(),
});
}
}
}
};
trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison);
// collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new();
match needed_blocks_comparison {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. ignore rpcs that are still syncing
trace!("old block needed");
let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
let min_sync_num =
self.max_block_lag.map(|x| head_block_num.saturating_sub(x));
// TODO: cache this somehow?
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.by_name
.read()
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
#[allow(clippy::if_same_then_else)]
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.map(|max_block_needed| !x.has_block_data(max_block_needed))
.unwrap_or(false)
{
// server does not have the max block
trace!(
"{} does not have the max block ({:?})",
x,
max_block_needed
);
false
} else {
!min_block_needed
.map(|min_block_needed| !x.has_block_data(min_block_needed))
.unwrap_or(false)
}
})
.cloned()
{
let x_head_block = x.head_block.read().clone();
if let Some(x_head) = x_head_block {
// TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load
let x_head_num = x_head.number().min(head_block_num);
// TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num {
trace!("rpc is still syncing");
continue;
}
}
if let Some(min_block_age) = min_block_age {
if x_head.age() > min_block_age {
// rpc is still syncing
trace!("server's block is too old");
continue;
}
}
let key = (x.tier, Some(*x_head_num));
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
}
cmp::Ordering::Equal => {
// using the consensus head block. filter the synced rpcs
// the key doesn't matter if we are checking synced connections
// they are all at the same block and it is already sized to what we need
let key = (0, None);
for x in synced_connections.best_rpcs.iter() {
if skip.contains(x) {
trace!("skipping: {}", x);
continue;
}
trace!("not skipped!");
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady);
}
}
m
}
};
trace!(

@ -187,7 +187,6 @@ impl Web3Rpc {
};
let tx_id_sender = if config.subscribe_txs {
// TODO: warn if tx_id_sender is None?
tx_id_sender
} else {
None
@ -557,7 +556,8 @@ impl Web3Rpc {
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await?
.await
.context(format!("waiting for request handle on {}", self))?
.request(
"eth_chainId",
&json!(Option::None::<()>),
@ -580,18 +580,20 @@ impl Web3Rpc {
}
}
Err(e) => {
return Err(anyhow::Error::from(e));
return Err(anyhow::Error::from(e)
.context(format!("unable to parse eth_chainId from {}", self)));
}
}
self.check_block_data_limit(&authorization, unlocked_provider.clone())
.await?;
.await
.context(format!("unable to check_block_data_limit of {}", self))?;
drop(unlocked_provider);
info!("successfully connected to {}", self);
} else if self.provider.read().await.is_none() {
return Err(anyhow!("failed waiting for client"));
return Err(anyhow!("failed waiting for client {}", self));
};
Ok(())
@ -726,7 +728,7 @@ impl Web3Rpc {
// this does loop. just only when reconnect is enabled
#[allow(clippy::never_loop)]
loop {
debug!("subscription loop started");
trace!("subscription loop started on {}", self);
let mut futures = vec![];