Improve max wait and timeouts (#137)

* pass max wait with more functions

* move rpc_head_str higher up to use it in all the logs

* refresh consensus at double the block time

* new pricing that also includes archive check

* lint

* don't clone method

* put caches into one list

they serialize with their name, so it will still be easy to tell them apart

* more consistent waiting. still waits more than I want right now

* clean up default max waits

* more timeouts
This commit is contained in:
Bryan Stitt 2023-06-20 14:22:14 -07:00 committed by GitHub
parent fa55e5ce9b
commit 3c2f95de10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 545 additions and 402 deletions

@ -1071,6 +1071,7 @@ impl Web3ProxyApp {
Some(request_metadata),
None,
None,
Some(Duration::from_secs(30)),
Some(Level::Trace.into()),
None,
true,
@ -1101,6 +1102,7 @@ impl Web3ProxyApp {
Some(request_metadata),
None,
None,
Some(Duration::from_secs(30)),
Some(Level::Trace.into()),
num_public_rpcs,
true,
@ -1271,6 +1273,7 @@ impl Web3ProxyApp {
method,
&params,
Some(request_metadata),
Some(Duration::from_secs(30)),
None,
None,
)
@ -1311,6 +1314,7 @@ impl Web3ProxyApp {
method,
&params,
Some(request_metadata),
Some(Duration::from_secs(30)),
None,
None,
)
@ -1343,6 +1347,7 @@ impl Web3ProxyApp {
method,
&params,
Some(request_metadata),
Some(Duration::from_secs(30)),
None,
None,
)
@ -1366,6 +1371,7 @@ impl Web3ProxyApp {
method,
&params,
Some(request_metadata),
Some(Duration::from_secs(30)),
Some(&U64::one()),
None,
)
@ -1610,7 +1616,7 @@ impl Web3ProxyApp {
let request_block = self
.balanced_rpcs
.block(&authorization, &request_block_hash, None)
.block(&authorization, &request_block_hash, None, None)
.await?
.block;
@ -1640,7 +1646,7 @@ impl Web3ProxyApp {
let from_block = self
.balanced_rpcs
.block(&authorization, &from_block_hash, None)
.block(&authorization, &from_block_hash, None, None)
.await?
.block;
@ -1651,7 +1657,7 @@ impl Web3ProxyApp {
let to_block = self
.balanced_rpcs
.block(&authorization, &to_block_hash, None)
.block(&authorization, &to_block_hash, None, None)
.await?
.block;
@ -1666,7 +1672,7 @@ impl Web3ProxyApp {
};
// TODO: different timeouts for different user tiers. get the duration out of the request_metadata
let duration = Duration::from_secs(240);
let max_wait = Duration::from_secs(240);
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block_num();
@ -1679,16 +1685,16 @@ impl Web3ProxyApp {
.jsonrpc_response_cache
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
let response_data = timeout(
duration,
max_wait + Duration::from_millis(10),
self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
&params,
Some(request_metadata),
Some(max_wait),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
)
))
.await?;
if !cache_jsonrpc_errors && let Err(err) = response_data {
@ -1705,12 +1711,13 @@ impl Web3ProxyApp {
}).await?
} else {
let x = timeout(
duration,
max_wait + Duration::from_millis(10),
self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
&params,
Some(request_metadata),
Some(max_wait),
None,
None,
)

@ -76,7 +76,7 @@ pub async fn clean_block_number(
serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs
.block(authorization, &block_hash, None)
.block(authorization, &block_hash, None, None)
.await
.context("fetching block number from hash")?;

@ -8,6 +8,7 @@
use log::warn;
use migration::sea_orm::prelude::Decimal;
use std::str::FromStr;
pub struct ComputeUnit(Decimal);
@ -121,12 +122,16 @@ impl ComputeUnit {
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
pub fn cost(&self, cache_hit: bool, usd_per_cu: Decimal) -> Decimal {
pub fn cost(&self, archive_request: bool, cache_hit: bool, usd_per_cu: Decimal) -> Decimal {
let mut cost = self.0 * usd_per_cu;
if archive_request {
cost *= Decimal::from_str("2.5").unwrap();
}
// cache hits get a 50% discount
if cache_hit {
cost /= Decimal::from(2)
cost *= Decimal::from_str("0.75").unwrap()
}
cost

@ -17,7 +17,7 @@ use axum::{
};
use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd;
use log::{info};
use log::info;
use moka::future::{Cache, CacheBuilder};
use std::net::SocketAddr;
use std::sync::Arc;

@ -138,17 +138,19 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
// TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({
"balanced_rpcs": app.balanced_rpcs,
"bearer_token_semaphores": MokaCacheSerializer(&app.bearer_token_semaphores),
"bundler_4337_rpcs": app.bundler_4337_rpcs,
"caches": [
MokaCacheSerializer(&app.bearer_token_semaphores),
MokaCacheSerializer(&app.ip_semaphores),
MokaCacheSerializer(&app.jsonrpc_response_cache),
MokaCacheSerializer(&app.rpc_secret_key_cache),
MokaCacheSerializer(&app.user_balance_cache),
MokaCacheSerializer(&app.user_semaphores),
],
"chain_id": app.config.chain_id,
"hostname": app.hostname,
"ip_semaphores": MokaCacheSerializer(&app.ip_semaphores),
"jsonrpc_response_cache": MokaCacheSerializer(&app.jsonrpc_response_cache),
"payment_factory_address": app.config.deposit_factory_contract,
"private_rpcs": app.private_rpcs,
"rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache),
"user_balance_cache": MokaCacheSerializer(&app.user_balance_cache),
"user_semaphores": MokaCacheSerializer(&app.user_semaphores),
"version": APP_USER_AGENT,
});

@ -3,20 +3,21 @@ use super::consensus::ConsensusFinder;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::config::BlockAndRpc;
use crate::config::{average_block_interval, BlockAndRpc};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, trace, warn};
use log::{debug, error, trace, warn};
use moka::future::Cache;
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::json;
use std::hash::Hash;
use std::time::Duration;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use std::{fmt::Display, sync::Arc};
use tokio::sync::broadcast;
use tokio::time::timeout;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -269,6 +270,7 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Web3ProxyBlock> {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
@ -299,13 +301,19 @@ impl Web3Rpcs {
&get_block_params,
authorization,
None,
max_wait,
)
.await?
} else {
// ask any rpc
// TODO: retry if "Requested data is not available"
// TODO: request_with_metadata instead of internal_request
self.internal_request::<_, Option<ArcBlock>>("eth_getBlockByHash", &get_block_params)
.await?
self.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
max_wait,
)
.await?
};
match block {
@ -375,8 +383,8 @@ impl Web3Rpcs {
// deref to not keep the lock open
if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorization through here?
let block = self.block(authorization, &block_hash, None).await?;
// TODO: configurable max wait and rpc
let block = self.block(authorization, &block_hash, None, None).await?;
return Ok((block, block_depth));
}
@ -384,7 +392,7 @@ impl Web3Rpcs {
// block number not in cache. we need to ask an rpc for it
// TODO: this error is too broad
let response = self
.internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false))
.internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false), None)
.await?
.ok_or(Web3ProxyError::NoBlocksKnown)?;
@ -404,240 +412,68 @@ impl Web3Rpcs {
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> {
let mut connection_heads =
let mut consensus_finder =
ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag));
// TODO: what timeout on block receiver? we want to keep consensus_finder fresh so that server tiers are correct
let double_block_time = average_block_interval(self.chain_id).mul_f32(2.0);
loop {
match block_receiver.recv_async().await {
Ok((new_block, rpc)) => {
match timeout(double_block_time, block_receiver.recv_async()).await {
Ok(Ok((new_block, rpc))) => {
let rpc_name = rpc.name.clone();
if let Err(err) = self
.process_block_from_rpc(
// TODO: what timeout on this?
match timeout(
Duration::from_secs(2),
consensus_finder.process_block_from_rpc(
self,
authorization,
&mut connection_heads,
new_block,
rpc,
&pending_tx_sender,
)
.await
),
)
.await
{
warn!(
"error while processing block from rpc {}: {:#?}",
rpc_name, err
);
Ok(Ok(_)) => {}
Ok(Err(err)) => {
error!(
"error while processing block from rpc {}: {:#?}",
rpc_name, err
);
}
Err(timeout) => {
error!(
"timeout while processing block from {}: {:#?}",
rpc_name, timeout
);
}
}
}
Err(err) => {
warn!("block_receiver exited! {:#?}", err);
Ok(Err(err)) => {
// TODO: panic is probably too much, but getting here is definitely not good
error!("block_receiver on {} exited! {:#?}", self, err);
return Err(err.into());
}
}
}
}
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete ArcBlock.
/// TODO: return something?
pub(crate) async fn process_block_from_rpc(
&self,
authorization: &Arc<Authorization>,
consensus_finder: &mut ConsensusFinder,
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
_pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> {
// TODO: how should we handle an error here?
if !consensus_finder
.update_rpc(new_block.clone(), rpc.clone(), self)
.await
.web3_context("failed to update rpc")?
{
// nothing changed. no need to scan for a new consensus head
return Ok(());
}
let new_consensus_rpcs = match consensus_finder
.find_consensus_connections(authorization, self)
.await
{
Err(err) => {
return Err(err).web3_context("error while finding consensus head block!");
}
Ok(None) => {
return Err(Web3ProxyError::NoConsensusHeadBlock);
}
Ok(Some(x)) => x,
};
trace!("new_synced_connections: {:#?}", new_consensus_rpcs);
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
let consensus_tier = new_consensus_rpcs.tier;
// TODO: think more about the default for total_tiers
let total_tiers = consensus_finder.worst_tier().unwrap_or_default();
let backups_needed = new_consensus_rpcs.backups_needed;
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.len();
let new_consensus_rpcs = Arc::new(new_consensus_rpcs);
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
.send_replace(Some(new_consensus_rpcs.clone()));
let backups_voted_str = if backups_needed { "B " } else { "" };
match old_consensus_head_connections.as_ref() {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
}
Some(old_consensus_connections) => {
let old_head_block = &old_consensus_connections.head_block;
// TODO: do this log item better
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_head_block.number().cmp(old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
debug!(
"unc {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.web3_context("save consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
Err(_) => {
// TODO: what timeout on this?
match timeout(
Duration::from_secs(2),
consensus_finder.refresh(self, authorization, None, None),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(err)) => {
error!("error while refreshing consensus finder: {:#?}", err);
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log that includes all the votes
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
Err(timeout) => {
error!("timeout while refreshing consensus finder: {:#?}", timeout);
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.web3_context(
"save_block sending consensus_head_block as heaviest chain",
)?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
}
}
}
Ok(())
}
}

@ -1,7 +1,8 @@
use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use super::transactions::TxStatus;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use base64::engine::general_purpose;
use derive_more::Constructor;
@ -10,7 +11,7 @@ use hashbrown::{HashMap, HashSet};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram;
use itertools::{Itertools, MinMaxResult};
use log::{log_enabled, trace, warn, Level};
use log::{debug, log_enabled, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
@ -18,6 +19,7 @@ use std::collections::BTreeMap;
use std::fmt;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::Instant;
#[derive(Clone, Serialize)]
@ -369,6 +371,229 @@ impl ConsensusFinder {
self.rpc_heads.is_empty()
}
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete ArcBlock.
/// TODO: return something?
/// TODO: move this onto ConsensusFinder
pub(super) async fn refresh(
&mut self,
web3_rpcs: &Web3Rpcs,
authorization: &Arc<Authorization>,
rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<()> {
let new_consensus_rpcs = match self
.find_consensus_connections(authorization, web3_rpcs)
.await
{
Err(err) => {
return Err(err).web3_context("error while finding consensus head block!");
}
Ok(None) => {
return Err(Web3ProxyError::NoConsensusHeadBlock);
}
Ok(Some(x)) => x,
};
trace!("new_synced_connections: {:#?}", new_consensus_rpcs);
let watch_consensus_head_sender = web3_rpcs.watch_consensus_head_sender.as_ref().unwrap();
let consensus_tier = new_consensus_rpcs.tier;
// TODO: think more about the default for total_tiers
let total_tiers = self.worst_tier().unwrap_or_default();
let backups_needed = new_consensus_rpcs.backups_needed;
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
let num_active_rpcs = self.len();
let total_rpcs = web3_rpcs.len();
let new_consensus_rpcs = Arc::new(new_consensus_rpcs);
let old_consensus_head_connections = web3_rpcs
.watch_consensus_rpcs_sender
.send_replace(Some(new_consensus_rpcs.clone()));
let backups_voted_str = if backups_needed { "B " } else { "" };
let rpc_head_str = if let Some(rpc) = rpc.as_ref() {
format!(
"{}@{}",
rpc,
new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string()),
)
} else {
"None".to_string()
};
match old_consensus_head_connections.as_ref() {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
}
Some(old_consensus_connections) => {
let old_head_block = &old_consensus_connections.head_block;
match consensus_head_block.number().cmp(old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc_head_str,
)
} else {
// hash changed
debug!(
"unc {}/{} {}{}/{}/{} con={} old={} rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc_head_str,
);
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context("save consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log that includes all the votes
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context(
"save_block sending consensus_head_block as heaviest chain",
)?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}/{} {}{}/{}/{} con={} rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?;
watch_consensus_head_sender.send(Some(consensus_head_block))
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
}
}
}
Ok(())
}
pub(super) async fn process_block_from_rpc(
&mut self,
web3_rpcs: &Web3Rpcs,
authorization: &Arc<Authorization>,
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
_pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> {
// TODO: how should we handle an error here?
if !self
.update_rpc(new_block.clone(), rpc.clone(), web3_rpcs)
.await
.web3_context("failed to update rpc")?
{
// nothing changed. no need to scan for a new consensus head
return Ok(());
}
self.refresh(web3_rpcs, authorization, Some(&rpc), new_block)
.await
}
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
self.rpc_heads.remove(rpc)
}
@ -590,14 +815,17 @@ impl ConsensusFinder {
backup_entry.0.insert(rpc);
backup_entry.1 += rpc.soft_limit;
// we used to specify rpc on this, but it shouldn't be necessary
let parent_hash = block_to_check.parent_hash();
match web3_rpcs
.block(authorization, block_to_check.parent_hash(), Some(rpc))
.block(authorization, parent_hash, None, None)
.await
{
Ok(parent_block) => block_to_check = parent_block,
Err(err) => {
warn!(
"Problem fetching parent block of {:?} during consensus finding: {:#?}",
debug!(
"Problem fetching {:?} (parent of {:?}) during consensus finding: {:#?}",
parent_hash,
block_to_check.hash(),
err
);

@ -40,6 +40,7 @@ use tokio::time::{sleep, sleep_until, Duration, Instant};
pub struct Web3Rpcs {
/// TODO: this should be a Cow
pub(crate) name: String,
pub(crate) chain_id: u64,
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
@ -126,6 +127,7 @@ impl Web3Rpcs {
blocks_by_hash,
blocks_by_number,
by_name,
chain_id,
max_head_block_age,
max_head_block_lag,
min_synced_rpcs: min_head_rpcs,
@ -515,6 +517,8 @@ impl Web3Rpcs {
max_wait: Option<Duration>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
let start = Instant::now();
let mut earliest_retry_at: Option<Instant> = None;
// TODO: pass db_conn to the "default" authorization for revert logging
@ -561,9 +565,6 @@ impl Web3Rpcs {
}
}
} else {
let stop_trying_at =
Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10));
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.len());
@ -703,23 +704,31 @@ impl Web3Rpcs {
let waiting_for = min_block_needed.max(max_block_needed);
match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => {
if Instant::now() > stop_trying_at {
break;
if let Some(max_wait) = max_wait {
match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => {
if start.elapsed() > max_wait {
break;
}
}
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_consensus_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
},
_ = sleep_until(start + max_wait) => break,
},
}
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_consensus_rpcs.changed() => {},
_ = sleep_until(stop_trying_at) => break,
}
} else if let Some(max_wait) = max_wait {
select! {
_ = watch_consensus_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
},
_ = sleep_until(start + max_wait) => break,
}
} else {
select! {
_ = watch_consensus_rpcs.changed() => {},
_ = sleep_until(stop_trying_at) => break,
}
break;
}
}
}
@ -769,6 +778,11 @@ impl Web3Rpcs {
trace!("max_count: {}", max_count);
if max_count == 0 {
// TODO: return a future that resolves when we know a head block?
return Err(None);
}
let mut selected_rpcs = Vec::with_capacity(max_count);
let mut tried = HashSet::new();
@ -779,6 +793,7 @@ impl Web3Rpcs {
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
synced_rpcs.head_rpcs.clone()
} else {
// TODO: make this an Option instead of making an empty vec?
vec![]
}
};
@ -800,10 +815,6 @@ impl Web3Rpcs {
.unwrap_or_default();
for rpc in itertools::chain(synced_rpcs, all_rpcs) {
if max_count == 0 {
break;
}
if tried.contains(&rpc) {
continue;
}
@ -813,10 +824,11 @@ impl Web3Rpcs {
tried.insert(rpc.clone());
if !allow_backups && rpc.backup {
warn!("{} is a backup. skipping", rpc);
trace!("{} is a backup. skipping", rpc);
continue;
}
// TODO: this has_block_data check is in a few places now. move it onto the rpc
if let Some(block_needed) = min_block_needed {
if !rpc.has_block_data(block_needed) {
trace!("{} is missing min_block_needed. skipping", rpc);
@ -840,8 +852,12 @@ impl Web3Rpcs {
}
Ok(OpenRequestResult::Handle(handle)) => {
trace!("{} is available", rpc);
selected_rpcs.push(handle);
max_count -= 1;
selected_rpcs.push(handle)
if max_count == 0 {
break;
}
}
Ok(OpenRequestResult::NotReady) => {
warn!("no request handle for {}", rpc)
@ -864,9 +880,10 @@ impl Web3Rpcs {
&self,
method: &str,
params: &P,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata(method, params, None, None, None)
self.request_with_metadata(method, params, None, max_wait, None, None)
.await
}
@ -876,6 +893,7 @@ impl Web3Rpcs {
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
@ -886,20 +904,24 @@ impl Web3Rpcs {
let start = Instant::now();
// TODO: get from config or arguments
let max_wait = Duration::from_secs(10);
// set error_handler to Save. this might be overridden depending on the request_metadata.authorization
let error_handler = Some(RequestErrorHandler::Save);
// TODO: the loop here feels somewhat redundant with the loop in best_available_rpc
while start.elapsed() < max_wait {
loop {
if let Some(max_wait) = max_wait {
if start.elapsed() > max_wait {
break;
}
}
match self
.wait_for_best_rpc(
request_metadata,
&mut skip_rpcs,
min_block_needed,
max_block_needed,
None,
max_wait,
error_handler,
)
.await?
@ -1135,18 +1157,22 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_wait: Option<Duration>,
error_level: Option<RequestErrorHandler>,
max_sends: Option<usize>,
include_backups: bool,
) -> Web3ProxyResult<Box<RawValue>> {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
// TODO: get from config or function arguments
let max_wait = Duration::from_secs(5);
let start = Instant::now();
let wait_until = Instant::now() + max_wait;
loop {
if let Some(max_wait) = max_wait {
if start.elapsed() > max_wait {
break;
}
}
while Instant::now() < wait_until {
match self
.all_connections(
request_metadata,
@ -1197,34 +1223,54 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
let max_sleep = if let Some(max_wait) = max_wait {
start + max_wait
} else {
break;
};
tokio::select! {
_ = sleep_until(wait_until) => break,
_ = sleep_until(max_sleep) => {
// rpcs didn't change and we have waited too long. break to return an error
warn!("timeout waiting for try_send_all_synced_connections!");
break;
},
_ = watch_consensus_rpcs.changed() => {
// consensus rpcs changed!
watch_consensus_rpcs.borrow_and_update();
// continue to try again
continue;
}
}
continue;
}
Err(Some(retry_at)) => {
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
if retry_at > wait_until {
warn!("All rate limits exceeded. And sleeping would take too long");
if let Some(max_wait) = max_wait {
if start.elapsed() > max_wait {
warn!("All rate limits exceeded. And sleeping would take too long");
break;
}
warn!("All rate limits exceeded. Sleeping");
// TODO: only make one of these sleep_untils
tokio::select! {
_ = sleep_until(start + max_wait) => {break}
_ = sleep_until(retry_at) => {}
_ = watch_consensus_rpcs.changed() => {
watch_consensus_rpcs.borrow_and_update();
}
}
continue;
} else {
warn!("All rate limits exceeded.");
break;
}
warn!("All rate limits exceeded. Sleeping");
tokio::select! {
_ = sleep_until(retry_at) => {}
_ = watch_consensus_rpcs.changed() => {
watch_consensus_rpcs.borrow_and_update();
}
}
continue;
}
}
}
@ -1237,6 +1283,7 @@ impl Web3Rpcs {
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
@ -1248,6 +1295,7 @@ impl Web3Rpcs {
method,
params,
request_metadata,
max_wait,
min_block_needed,
max_block_needed,
)
@ -1518,10 +1566,13 @@ mod tests {
let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender: block_sender.clone(),
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1548,25 +1599,16 @@ mod tests {
let mut consensus_finder = ConsensusFinder::new(None, None);
// process None so that
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
lagged_rpc.clone(),
&None,
)
.await
.expect("its lagged, but it should still be seen as consensus if its the first to report");
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
head_rpc.clone(),
&None,
)
.await
.unwrap();
consensus_finder
.process_block_from_rpc(&rpcs, &authorization, None, lagged_rpc.clone(), &None)
.await
.expect(
"its lagged, but it should still be seen as consensus if its the first to report",
);
consensus_finder
.process_block_from_rpc(&rpcs, &authorization, None, head_rpc.clone(), &None)
.await
.unwrap();
// no head block because the rpcs haven't communicated through their channels
assert!(rpcs.head_block_hash().is_none());
@ -1607,16 +1649,17 @@ mod tests {
.await
.unwrap();
// TODO: this is fragile
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone().try_into().unwrap()),
lagged_rpc.clone(),
&None,
)
.await
.unwrap();
// TODO: calling process_block_from_rpc and send_head_block_result seperate seems very fragile
consensus_finder
.process_block_from_rpc(
&rpcs,
&authorization,
Some(lagged_block.clone().try_into().unwrap()),
lagged_rpc.clone(),
&None,
)
.await
.unwrap();
head_rpc
.send_head_block_result(
@ -1628,15 +1671,16 @@ mod tests {
.unwrap();
// TODO: this is fragile
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone().try_into().unwrap()),
head_rpc.clone(),
&None,
)
.await
.unwrap();
consensus_finder
.process_block_from_rpc(
&rpcs,
&authorization,
Some(lagged_block.clone().try_into().unwrap()),
head_rpc.clone(),
&None,
)
.await
.unwrap();
// TODO: how do we spawn this and wait for it to process things? subscribe and watch consensus connections?
// rpcs.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender)
@ -1660,15 +1704,16 @@ mod tests {
.unwrap();
// TODO: this is fragile
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(head_block.clone().try_into().unwrap()),
head_rpc.clone(),
&None,
)
.await
.unwrap();
consensus_finder
.process_block_from_rpc(
&rpcs,
&authorization,
Some(head_block.clone().try_into().unwrap()),
head_rpc.clone(),
&None,
)
.await
.unwrap();
assert_eq!(rpcs.num_synced_rpcs(), 1);
@ -1799,9 +1844,12 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
let rpcs = Web3Rpcs {
block_sender,
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1827,25 +1875,27 @@ mod tests {
let mut connection_heads = ConsensusFinder::new(None, None);
// min sum soft limit will require tier 2
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
pruned_rpc.clone(),
&None,
)
.await
.unwrap_err();
connection_heads
.process_block_from_rpc(
&rpcs,
&authorization,
Some(head_block.clone()),
pruned_rpc.clone(),
&None,
)
.await
.unwrap_err();
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
archive_rpc.clone(),
&None,
)
.await
.unwrap();
connection_heads
.process_block_from_rpc(
&rpcs,
&authorization,
Some(head_block.clone()),
archive_rpc.clone(),
&None,
)
.await
.unwrap();
assert_eq!(rpcs.num_synced_rpcs(), 2);
@ -1984,10 +2034,13 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -2004,27 +2057,29 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let mut connection_heads = ConsensusFinder::new(None, None);
let mut consensus_finder = ConsensusFinder::new(None, None);
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(block_1.clone()),
mock_geth.clone(),
&None,
)
.await
.unwrap();
consensus_finder
.process_block_from_rpc(
&rpcs,
&authorization,
Some(block_1.clone()),
mock_geth.clone(),
&None,
)
.await
.unwrap();
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(block_2.clone()),
mock_erigon_archive.clone(),
&None,
)
.await
.unwrap();
consensus_finder
.process_block_from_rpc(
&rpcs,
&authorization,
Some(block_2.clone()),
mock_erigon_archive.clone(),
&None,
)
.await
.unwrap();
assert_eq!(rpcs.num_synced_rpcs(), 1);

@ -29,7 +29,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::watch;
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use url::Url;
/// An active connection to a Web3 RPC server like geth or erigon.
@ -318,17 +318,16 @@ impl Web3Rpc {
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let head_block_num_future = self.internal_request::<_, U256>(
"eth_blockNumber",
&(),
// error here are expected, so keep the level low
Some(Level::Debug.into()),
);
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
let head_block_num = self
.internal_request::<_, U256>(
"eth_blockNumber",
&(),
// error here are expected, so keep the level low
Some(Level::Debug.into()),
Some(Duration::from_secs(5)),
)
.await
.context("timeout fetching eth_blockNumber")?
.context("provider error")?;
.context("head_block_num error during check_block_data_limit")?;
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
@ -349,6 +348,7 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Some(Level::Trace.into()),
Some(Duration::from_secs(5)),
)
.await;
@ -434,7 +434,12 @@ impl Web3Rpc {
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: U64 = self
.internal_request("eth_chainId", &(), Some(Level::Trace.into()))
.internal_request(
"eth_chainId",
&(),
Some(Level::Trace.into()),
Some(Duration::from_secs(5)),
)
.await?;
trace!("found_chain_id: {:#?}", found_chain_id);
@ -556,6 +561,7 @@ impl Web3Rpc {
"eth_getTransactionByHash",
&(txid,),
error_handler,
Some(Duration::from_secs(5)),
)
.await?
.context("no transaction")?;
@ -577,6 +583,7 @@ impl Web3Rpc {
"eth_getCode",
&(to, block_number),
error_handler,
Some(Duration::from_secs(5)),
)
.await?;
} else {
@ -821,6 +828,7 @@ impl Web3Rpc {
&("latest", false),
&authorization,
Some(Level::Warn.into()),
Some(Duration::from_secs(5)),
)
.await;
@ -839,7 +847,7 @@ impl Web3Rpc {
.await?;
}
} else if self.http_provider.is_some() {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
// TODO: is 1/2 the block time okay?
let mut i = interval(self.block_interval / 2);
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
@ -856,6 +864,7 @@ impl Web3Rpc {
&("latest", false),
&authorization,
Some(Level::Warn.into()),
Some(Duration::from_secs(5)),
)
.await;
@ -1057,10 +1066,11 @@ impl Web3Rpc {
method: &str,
params: &P,
error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let authorization = Default::default();
self.authorized_request(method, params, &authorization, error_handler)
self.authorized_request(method, params, &authorization, error_handler, max_wait)
.await
}
@ -1070,10 +1080,11 @@ impl Web3Rpc {
params: &P,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: take max_wait as a function argument?
let x = self
.wait_for_request_handle(authorization, None, error_handler)
.wait_for_request_handle(authorization, max_wait, error_handler)
.await?
.request::<P, R>(method, params)
.await?;

@ -27,6 +27,7 @@ use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::mem;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, Ordering};
@ -806,26 +807,25 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
x => x,
};
let method = metadata.method.clone();
let chain_id = metadata.chain_id;
let cu = ComputeUnit::new(&method, metadata.chain_id);
let cu = ComputeUnit::new(&metadata.method, metadata.chain_id);
// TODO: get from config? a helper function? how should we pick this?
let usd_per_cu = match chain_id {
137 => Decimal::from_str("0.000000692307692307"),
_ => Decimal::from_str("0.000000692307692307"),
let usd_per_cu = match metadata.chain_id {
137 => Decimal::from_str("0.000000533333333333333"),
_ => Decimal::from_str("0.000000400000000000000"),
}?;
let cache_hit = !backend_rpcs_used.is_empty();
let compute_unit_cost = cu.cost(cache_hit, usd_per_cu);
let compute_unit_cost = cu.cost(archive_request, cache_hit, usd_per_cu);
let method = mem::take(&mut metadata.method);
let x = Self {
archive_request,
authorization,
backend_rpcs_used,
chain_id,
chain_id: metadata.chain_id,
compute_unit_cost,
error_response,
method,

@ -1,8 +1,7 @@
use std::str::FromStr;
use axum::headers::authorization::Bearer;
use migration::sea_orm::prelude::Uuid;
use serde::Serialize;
use std::str::FromStr;
use ulid::Ulid;
/// Key used for caching the user's login