remove allowed lag

This commit is contained in:
Bryan Stitt 2023-01-19 03:05:39 -08:00
parent 2d5d115d6f
commit 52a9ba604c
6 changed files with 74 additions and 147 deletions

@ -307,6 +307,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] improve rate limiting on websockets
- [x] retry another server if we get a jsonrpc response error about rate limits
- [x] major refactor to only use backup servers when absolutely necessary
- [x] remove allowed lag
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -190,7 +190,6 @@ pub struct Web3ProxyApp {
head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub allowed_lag: u64,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_replica: Option<DatabaseReplica>,
/// prometheus metrics
@ -687,20 +686,8 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: get this out of the toml instead
let allowed_lag = match top_config.app.chain_id {
1 => 60,
137 => 10,
250 => 10,
_ => {
warn!("defaulting allowed lag to 60");
60
}
};
let app = Self {
config: top_config.app,
allowed_lag,
balanced_rpcs,
private_rpcs,
response_cache,
@ -1432,7 +1419,6 @@ impl Web3ProxyApp {
.balanced_rpcs
.try_proxy_connection(
proxy_mode,
self.allowed_lag,
&authorization,
request,
Some(&request_metadata),
@ -1459,7 +1445,6 @@ impl Web3ProxyApp {
self.balanced_rpcs
.try_proxy_connection(
proxy_mode,
self.allowed_lag,
&authorization,
request,
Some(&request_metadata),

@ -223,7 +223,6 @@ impl Web3ConnectionConfig {
pub async fn spawn(
self,
name: String,
allowed_lag: u64,
db_conn: Option<DatabaseConnection>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64,
@ -262,7 +261,6 @@ impl Web3ConnectionConfig {
Web3Connection::spawn(
name,
allowed_lag,
self.display_name,
chain_id,
db_conn,

@ -78,11 +78,6 @@ impl SavedBlock {
pub fn number(&self) -> U64 {
self.block.number.expect("saved blocks must have a number")
}
/// When the block was received, this node was still syncing
pub fn syncing(&self, allowed_lag: u64) -> bool {
self.age > allowed_lag
}
}
impl From<ArcBlock> for SavedBlock {
@ -172,7 +167,7 @@ impl Web3Connections {
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: don't hard code allowed lag
let response = self
.try_send_best_consensus_head_connection(60, authorization, request, None, None)
.try_send_best_consensus_head_connection(authorization, request, None, None)
.await?;
let block = response.result.context("failed fetching block")?;
@ -248,7 +243,7 @@ impl Web3Connections {
// TODO: if error, retry?
// TODO: request_metadata or authorization?
let response = self
.try_send_best_consensus_head_connection(60, authorization, request, None, Some(num))
.try_send_best_consensus_head_connection(authorization, request, None, Some(num))
.await?;
let raw_block = response.result.context("no block result")?;

@ -63,7 +63,6 @@ pub struct Web3Connection {
pub name: String,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
pub(super) allowed_lag: u64,
/// TODO: can we get this from the provider? do we even need it?
pub(super) url: String,
/// Some connections use an http_client. we keep a clone for reconnecting
@ -101,7 +100,6 @@ impl Web3Connection {
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
name: String,
allowed_lag: u64,
display_name: Option<String>,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
@ -140,7 +138,6 @@ impl Web3Connection {
let new_connection = Self {
name,
allowed_lag,
db_conn: db_conn.clone(),
display_name,
http_client,
@ -195,25 +192,7 @@ impl Web3Connection {
return Ok(None);
}
// check if we are synced
let head_block: ArcBlock = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.await?
.request::<_, Option<_>>(
"eth_getBlockByNumber",
&json!(("latest", false)),
// error here are expected, so keep the level low
Level::Warn.into(),
)
.await?
.context("no block during check_block_data_limit!")?;
if SavedBlock::from(head_block).syncing(60) {
// if the node is syncing, we can't check its block data limit
return Ok(None);
}
// TODO: add SavedBlock to self? probably best not to. we might not get marked Ready
// TODO: check eth_syncing. if it is not false, return Ok(None)
let mut limit = None;
@ -296,27 +275,10 @@ impl Web3Connection {
self.block_data_limit.load(atomic::Ordering::Acquire).into()
}
pub fn syncing(&self, allowed_lag: u64) -> bool {
match self.head_block.read().clone() {
None => true,
Some(x) => x.syncing(allowed_lag),
}
}
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let head_block_num = match self.head_block.read().clone() {
None => return false,
Some(x) => {
// TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks?
// TODO: sometimes blocks might actually just take longer than 60 seconds
if x.syncing(60) {
// skip syncing nodes. even though they might be able to serve a query,
// latency will be poor and it will get in the way of them syncing further
return false;
}
x.number()
}
Some(x) => x.number(),
};
// this rpc doesn't have that block yet. still syncing
@ -548,7 +510,7 @@ impl Web3Connection {
let _ = head_block.insert(new_head_block.clone().into());
}
if self.block_data_limit() == U64::zero() && !self.syncing(1) {
if self.block_data_limit() == U64::zero() {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
if let Err(err) = self.check_block_data_limit(&authorization).await {
warn!(
@ -596,8 +558,6 @@ impl Web3Connection {
reconnect: bool,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
let allowed_lag = self.allowed_lag;
loop {
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
@ -629,8 +589,6 @@ impl Web3Connection {
let health_sleep_seconds = 10;
sleep(Duration::from_secs(health_sleep_seconds)).await;
let mut warned = 0;
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
@ -649,38 +607,6 @@ impl Web3Connection {
}
// trace!("health check on {}. unlocked", conn);
if let Some(x) = &*conn.head_block.read() {
// if this block is too old, return an error so we reconnect
let current_lag = x.lag();
if current_lag > allowed_lag {
let level = if warned == 0 {
if conn.backup {
log::Level::Info
} else {
log::Level::Warn
}
} else if warned % 100 == 0 {
log::Level::Debug
} else {
log::Level::Trace
};
log::log!(
level,
"{} is lagged {} secs: {} {}",
conn,
current_lag,
x.number(),
x.hash(),
);
warned += 1;
} else {
// reset warnings now that we are connected
warned = 0;
}
}
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
};
@ -1222,7 +1148,6 @@ mod tests {
let x = Web3Connection {
name: "name".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com".to_string(),
@ -1271,7 +1196,6 @@ mod tests {
// TODO: this is getting long. have a `impl Default`
let x = Web3Connection {
name: "name".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com".to_string(),
@ -1299,6 +1223,8 @@ mod tests {
assert!(!x.has_block_data(&(head_block.number() + 1000)));
}
/*
// TODO: think about how to bring the concept of a "lagged" node back
#[test]
fn test_lagged_node_not_has_block_data() {
let now: U256 = SystemTime::now()
@ -1324,7 +1250,6 @@ mod tests {
let x = Web3Connection {
name: "name".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com".to_string(),
@ -1349,4 +1274,5 @@ mod tests {
assert!(!x.has_block_data(&(head_block.number() + 1)));
assert!(!x.has_block_data(&(head_block.number() + 1000)));
}
*/
}

@ -89,9 +89,6 @@ impl Web3Connections {
}
};
// TODO: this might be too aggressive. think about this more
let allowed_lag = ((expected_block_time_ms * 3) as f64 / 1000.0).round() as u64;
let http_interval_sender = if http_client.is_some() {
let (sender, receiver) = broadcast::channel(1);
@ -155,7 +152,6 @@ impl Web3Connections {
server_config
.spawn(
server_name,
allowed_lag,
db_conn,
redis_pool,
chain_id,
@ -408,10 +404,40 @@ impl Web3Connections {
unimplemented!("this shouldn't be possible")
}
/// get the best available rpc server with the consensus head block. it might have blocks after the consensus head
pub async fn best_consensus_head_connection(
&self,
allowed_lag: u64,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
if let Ok(without_backups) = self
._best_consensus_head_connection(
false,
authorization,
request_metadata,
skip,
min_block_needed,
)
.await
{
return Ok(without_backups);
}
self._best_consensus_head_connection(
true,
authorization,
request_metadata,
skip,
min_block_needed,
)
.await
}
/// get the best available rpc server with the consensus head block. it might have blocks after the consensus head
async fn _best_consensus_head_connection(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Connection>],
@ -421,12 +447,13 @@ impl Web3Connections {
(Option<U64>, u64),
Vec<Arc<Web3Connection>>,
> = if let Some(min_block_needed) = min_block_needed {
// need a potentially old block. check all the rpcs
// need a potentially old block. check all the rpcs. prefer the most synced
let mut m = BTreeMap::new();
for x in self
.conns
.values()
.filter(|x| if allow_backups { true } else { !x.backup })
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed))
.cloned()
@ -448,15 +475,7 @@ impl Web3Connections {
// need latest. filter the synced rpcs
let synced_connections = self.synced_connections.load();
let head_block = match synced_connections.head_block.as_ref() {
None => return Ok(OpenRequestResult::NotReady),
Some(x) => x,
};
// TODO: self.allowed_lag instead of taking as an arg
if head_block.syncing(allowed_lag) {
return Ok(OpenRequestResult::NotReady);
}
// TODO: if head_block is super old. emit an error!
let mut m = BTreeMap::new();
@ -575,7 +594,7 @@ impl Web3Connections {
None => {
// none of the servers gave us a time to retry at
// TODO: bring this back?
// TODO: bring this back? need to think about how to do this with `allow_backups`
// we could return an error here, but maybe waiting a second will fix the problem
// TODO: configurable max wait? the whole max request time, or just some portion?
// let handle = sorted_rpcs
@ -605,6 +624,24 @@ impl Web3Connections {
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
if let Ok(without_backups) = self
._all_synced_connections(false, authorization, block_needed, max_count)
.await
{
return Ok(without_backups);
}
self._all_synced_connections(true, authorization, block_needed, max_count)
.await
}
async fn _all_synced_connections(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
// TODO: with capacity?
@ -621,12 +658,14 @@ impl Web3Connections {
break;
}
if !allow_backups && connection.backup {
continue;
}
if let Some(block_needed) = block_needed {
if !connection.has_block_data(block_needed) {
continue;
}
} else if connection.syncing(30) {
continue;
}
// check rate limits and increment our connection counter
@ -663,10 +702,8 @@ impl Web3Connections {
}
/// be sure there is a timeout on this or it might loop forever
/// TODO: do not take allowed_lag here. have it be on the connections struct instead
pub async fn try_send_best_consensus_head_connection(
&self,
allowed_lag: u64,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
@ -682,7 +719,6 @@ impl Web3Connections {
}
match self
.best_consensus_head_connection(
allowed_lag,
authorization,
request_metadata,
&skip_rpcs,
@ -903,7 +939,6 @@ impl Web3Connections {
pub async fn try_proxy_connection(
&self,
proxy_mode: ProxyMode,
allowed_lag: u64,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
@ -912,7 +947,6 @@ impl Web3Connections {
match proxy_mode {
ProxyMode::Best => {
self.try_send_best_consensus_head_connection(
allowed_lag,
authorization,
request,
request_metadata,
@ -1014,8 +1048,6 @@ mod tests {
let head_rpc = Web3Connection {
name: "synced".to_string(),
// TODO: what should this be?
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com/synced".to_string(),
@ -1036,7 +1068,6 @@ mod tests {
let lagged_rpc = Web3Connection {
name: "lagged".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com/lagged".to_string(),
@ -1129,9 +1160,8 @@ mod tests {
);
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: don't hard code allowed_lag
let x = conns
.best_consensus_head_connection(60, &authorization, None, &[], None)
.best_consensus_head_connection(&authorization, None, &[], None)
.await
.unwrap();
@ -1186,21 +1216,21 @@ mod tests {
assert!(matches!(
conns
.best_consensus_head_connection(60, &authorization, None, &[], None)
.best_consensus_head_connection(&authorization, None, &[], None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(60, &authorization, None, &[], Some(&0.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()))
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()))
.await,
Ok(OpenRequestResult::Handle(_))
));
@ -1208,7 +1238,7 @@ mod tests {
// future block should not get a handle
assert!(matches!(
conns
.best_consensus_head_connection(60, &authorization, None, &[], Some(&2.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()))
.await,
Ok(OpenRequestResult::NotReady)
));
@ -1241,7 +1271,6 @@ mod tests {
let pruned_rpc = Web3Connection {
name: "pruned".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com/pruned".to_string(),
@ -1262,7 +1291,6 @@ mod tests {
let archive_rpc = Web3Connection {
name: "archive".to_string(),
allowed_lag: 10,
db_conn: None,
display_name: None,
url: "ws://example.com/archive".to_string(),
@ -1343,13 +1371,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
let best_head_server = conns
.best_consensus_head_connection(
60,
&authorization,
None,
&[],
Some(&head_block.number()),
)
.best_consensus_head_connection(&authorization, None, &[], Some(&head_block.number()))
.await;
assert!(matches!(
@ -1358,7 +1380,7 @@ mod tests {
));
let best_archive_server = conns
.best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()))
.await;
match best_archive_server {