partial refactor of allowed lag

This commit is contained in:
Bryan Stitt 2023-01-03 08:33:49 -08:00
parent 43dd9628e6
commit 5be5128c93
4 changed files with 55 additions and 28 deletions

View File

@ -184,6 +184,7 @@ pub struct Web3ProxyApp {
head_block_receiver: watch::Receiver<ArcBlock>, head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>, pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig, pub config: AppConfig,
pub allowed_lag: u64,
pub db_conn: Option<sea_orm::DatabaseConnection>, pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_replica: Option<DatabaseReplica>, pub db_replica: Option<DatabaseReplica>,
/// prometheus metrics /// prometheus metrics
@ -680,8 +681,20 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120)) .time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: get this out of the toml instead
let allowed_lag = match top_config.app.chain_id {
1 => 60,
137 => 10,
250 => 10,
_ => {
warn!("defaulting allowed lag to 60");
60
}
};
let app = Self { let app = Self {
config: top_config.app, config: top_config.app,
allowed_lag,
balanced_rpcs, balanced_rpcs,
private_rpcs, private_rpcs,
response_cache, response_cache,
@ -1374,6 +1387,7 @@ impl Web3ProxyApp {
let mut response = self let mut response = self
.balanced_rpcs .balanced_rpcs
.try_send_best_upstream_server( .try_send_best_upstream_server(
self.allowed_lag,
&authorization, &authorization,
request, request,
Some(&request_metadata), Some(&request_metadata),
@ -1397,6 +1411,7 @@ impl Web3ProxyApp {
} else { } else {
self.balanced_rpcs self.balanced_rpcs
.try_send_best_upstream_server( .try_send_best_upstream_server(
self.allowed_lag,
&authorization, &authorization,
request, request,
Some(&request_metadata), Some(&request_metadata),

View File

@ -55,23 +55,16 @@ impl SavedBlock {
} }
pub fn lag(&self) -> u64 { pub fn lag(&self) -> u64 {
// TODO: read this from a global config. different chains should probably have different gaps.
let allowed_lag: u64 = 60;
let now = SystemTime::now() let now = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("there should always be time"); .expect("there should always be time");
// TODO: get this from config
// TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks?
let oldest_allowed = now - Duration::from_secs(allowed_lag);
let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64()); let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64());
if block_timestamp < oldest_allowed { if block_timestamp < now {
// this server is still syncing from too far away to serve requests // this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above // u64 is safe because ew checked equality above
(oldest_allowed - block_timestamp).as_secs() as u64 (now - block_timestamp).as_secs() as u64
} else { } else {
0 0
} }
@ -87,9 +80,8 @@ impl SavedBlock {
} }
/// When the block was received, this node was still syncing /// When the block was received, this node was still syncing
pub fn syncing(&self) -> bool { pub fn syncing(&self, allowed_lag: u64) -> bool {
// TODO: margin should come from a global config self.lag > allowed_lag
self.lag > 60
} }
} }
@ -103,7 +95,7 @@ impl Display for SavedBlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.number(), self.hash())?; write!(f, "{} ({})", self.number(), self.hash())?;
if self.syncing() { if self.syncing(0) {
write!(f, " (behind by {} seconds)", self.lag)?; write!(f, " (behind by {} seconds)", self.lag)?;
} }
@ -177,8 +169,9 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorization? // TODO: request_metadata? maybe we should put it in the authorization?
// TODO: don't hard code allowed lag
let response = self let response = self
.try_send_best_upstream_server(authorization, request, None, None) .try_send_best_upstream_server(60, authorization, request, None, None)
.await?; .await?;
let block = response.result.context("failed fetching block")?; let block = response.result.context("failed fetching block")?;
@ -254,7 +247,7 @@ impl Web3Connections {
// TODO: if error, retry? // TODO: if error, retry?
// TODO: request_metadata or authorization? // TODO: request_metadata or authorization?
let response = self let response = self
.try_send_best_upstream_server(authorization, request, None, Some(num)) .try_send_best_upstream_server(60, authorization, request, None, Some(num))
.await?; .await?;
let raw_block = response.result.context("no block result")?; let raw_block = response.result.context("no block result")?;
@ -324,7 +317,8 @@ impl Web3Connections {
// we don't know if its on the heaviest chain yet // we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block.block, false).await?; self.save_block(&rpc_head_block.block, false).await?;
if rpc_head_block.syncing() { // TODO: don't default to 60. different chains are differen
if rpc_head_block.syncing(60) {
if connection_heads.remove(&rpc.name).is_some() { if connection_heads.remove(&rpc.name).is_some() {
warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag); warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag);
} else { } else {

View File

@ -202,7 +202,7 @@ impl Web3Connection {
.await? .await?
.context("no block during check_block_data_limit!")?; .context("no block during check_block_data_limit!")?;
if SavedBlock::from(head_block).syncing() { if SavedBlock::from(head_block).syncing(60) {
// if the node is syncing, we can't check its block data limit // if the node is syncing, we can't check its block data limit
return Ok(None); return Ok(None);
} }
@ -289,7 +289,7 @@ impl Web3Connection {
pub fn syncing(&self) -> bool { pub fn syncing(&self) -> bool {
match self.head_block.read().clone() { match self.head_block.read().clone() {
None => true, None => true,
Some(x) => x.syncing(), Some(x) => x.syncing(60),
} }
} }
@ -297,7 +297,7 @@ impl Web3Connection {
let head_block_num = match self.head_block.read().clone() { let head_block_num = match self.head_block.read().clone() {
None => return false, None => return false,
Some(x) => { Some(x) => {
if x.syncing() { if x.syncing(60) {
// skip syncing nodes. even though they might be able to serve a query, // skip syncing nodes. even though they might be able to serve a query,
// latency will be poor and it will get in the way of them syncing further // latency will be poor and it will get in the way of them syncing further
return false; return false;

View File

@ -388,6 +388,7 @@ impl Web3Connections {
/// get the best available rpc server /// get the best available rpc server
pub async fn best_synced_backend_connection( pub async fn best_synced_backend_connection(
&self, &self,
allowed_lag: u64,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>, request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
@ -422,11 +423,18 @@ impl Web3Connections {
// TODO: double check has_block_data? // TODO: double check has_block_data?
let synced_connections = self.synced_connections.load(); let synced_connections = self.synced_connections.load();
let head_num = match synced_connections.head_block.as_ref() { let head_block = match synced_connections.head_block.as_ref() {
None => return Ok(OpenRequestResult::NotReady), None => return Ok(OpenRequestResult::NotReady),
Some(x) => x.number(), Some(x) => x,
}; };
// TODO: different allowed_lag depending on the chain
if head_block.syncing(allowed_lag) {
return Ok(OpenRequestResult::NotReady);
}
let head_num = head_block.number();
let c: Vec<_> = synced_connections let c: Vec<_> = synced_connections
.conns .conns
.iter() .iter()
@ -607,8 +615,10 @@ impl Web3Connections {
} }
/// be sure there is a timeout on this or it might loop forever /// be sure there is a timeout on this or it might loop forever
/// TODO: do not take allowed_lag here. have it be on the connections struct instead
pub async fn try_send_best_upstream_server( pub async fn try_send_best_upstream_server(
&self, &self,
allowed_lag: u64,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
request: JsonRpcRequest, request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>, request_metadata: Option<&Arc<RequestMetadata>>,
@ -624,6 +634,7 @@ impl Web3Connections {
} }
match self match self
.best_synced_backend_connection( .best_synced_backend_connection(
allowed_lag,
authorization, authorization,
request_metadata, request_metadata,
&skip_rpcs, &skip_rpcs,
@ -1011,8 +1022,9 @@ mod tests {
); );
// best_synced_backend_connection requires servers to be synced with the head block // best_synced_backend_connection requires servers to be synced with the head block
// TODO: don't hard code allowed_lag
let x = conns let x = conns
.best_synced_backend_connection(&authorization, None, &[], None) .best_synced_backend_connection(60, &authorization, None, &[], None)
.await .await
.unwrap(); .unwrap();
@ -1067,21 +1079,21 @@ mod tests {
assert!(matches!( assert!(matches!(
conns conns
.best_synced_backend_connection(&authorization, None, &[], None) .best_synced_backend_connection(60, &authorization, None, &[], None)
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
assert!(matches!( assert!(matches!(
conns conns
.best_synced_backend_connection(&authorization, None, &[], Some(&0.into())) .best_synced_backend_connection(60, &authorization, None, &[], Some(&0.into()))
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
assert!(matches!( assert!(matches!(
conns conns
.best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into()))
.await, .await,
Ok(OpenRequestResult::Handle(_)) Ok(OpenRequestResult::Handle(_))
)); ));
@ -1089,7 +1101,7 @@ mod tests {
// future block should not get a handle // future block should not get a handle
assert!(matches!( assert!(matches!(
conns conns
.best_synced_backend_connection(&authorization, None, &[], Some(&2.into())) .best_synced_backend_connection(60, &authorization, None, &[], Some(&2.into()))
.await, .await,
Ok(OpenRequestResult::NotReady) Ok(OpenRequestResult::NotReady)
)); ));
@ -1219,7 +1231,13 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block // best_synced_backend_connection requires servers to be synced with the head block
let best_head_server = conns let best_head_server = conns
.best_synced_backend_connection(&authorization, None, &[], Some(&head_block.number())) .best_synced_backend_connection(
60,
&authorization,
None,
&[],
Some(&head_block.number()),
)
.await; .await;
assert!(matches!( assert!(matches!(
@ -1228,7 +1246,7 @@ mod tests {
)); ));
let best_archive_server = conns let best_archive_server = conns
.best_synced_backend_connection(&authorization, None, &[], Some(&1.into())) .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into()))
.await; .await;
match best_archive_server { match best_archive_server {