From 5a1a87a314cab366505534390a2d0ce561498782 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 7 Oct 2023 15:18:55 -0700 Subject: [PATCH] stream should count as success. lagged return the rpc --- web3_proxy/src/rpcs/consensus.rs | 56 +++++++++++++++++++++++++---- web3_proxy/src/rpcs/one.rs | 22 +++++++++--- web3_proxy/src/rpcs/request.rs | 5 +-- web3_proxy/src/stats/stat_buffer.rs | 6 ++-- 4 files changed, 74 insertions(+), 15 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index c31b62cd..d0b5882d 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -19,6 +19,8 @@ use serde::Serialize; use std::cmp::{min_by_key, Ordering, Reverse}; use std::sync::{atomic, Arc}; use std::time::Duration; +use tokio::select; +use tokio::task::yield_now; use tokio::time::{sleep_until, Instant}; use tracing::{debug, enabled, info, trace, warn, Level}; @@ -1041,11 +1043,11 @@ impl RpcsForRequest { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); continue; } - Ok(OpenRequestResult::Lagged(..)) => { + Ok(OpenRequestResult::Lagged(x)) => { trace!("{} is lagged. will not work now", best_rpc); // this will probably always be the same block, right? if wait_for_sync.is_none() { - wait_for_sync = Some(best_rpc); + wait_for_sync = Some(x); } continue; } @@ -1086,14 +1088,56 @@ impl RpcsForRequest { } } - // TODO: if block_needed, do something with it here. not sure how best to subscribe + // if we got this far, no rpcs are ready + + // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { if self.request.expire_instant <= retry_at { + // no point in waiting. it wants us to wait too long + earliest_retry_at = None; + } + } + + match (wait_for_sync, earliest_retry_at) { + (None, None) => { + // we have nothing to wait for. uh oh! break; } - sleep_until(retry_at).await; - } else { - break; + (None, Some(retry_at)) => { + // try again after rate limits are done + sleep_until(retry_at).await; + } + (Some(wait_for_sync), None) => { + select! { + x = wait_for_sync => { + match x { + Ok(rpc) => { + trace!(%rpc, "rpc ready. it might be used on the next loop"); + // TODO: try a handle now? + continue; + }, + Err(err) => { + trace!(?err, "problem while waiting for an rpc for a request"); + break; + }, + } + } + _ = sleep_until(self.request.expire_instant) => { + break; + } + } + } + (Some(wait_for_sync), Some(retry_at)) => { + select! { + x = wait_for_sync => { + todo!() + } + _ = sleep_until(retry_at) => { + yield_now().await; + continue; + } + } + } } } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 61c2b979..284c13db 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -30,6 +30,7 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::select; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; +use tokio::task::yield_now; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, trace, warn, Level}; use url::Url; @@ -1024,7 +1025,10 @@ impl Web3Rpc { } Ok(OpenRequestResult::Lagged(now_synced_f)) => { select! { - _ = now_synced_f => {} + _ = now_synced_f => { + // TODO: i'm guessing this is returning immediatly + yield_now().await; + } _ = sleep_until(web3_request.expire_instant) => { break; } @@ -1155,15 +1159,25 @@ impl Web3Rpc { clone.head_block_sender.as_ref().unwrap().subscribe(); // TODO: if head_block is far behind block_needed, retrurn now + head_block_receiver.borrow_and_update(); loop { select! { _ = head_block_receiver.changed() => { - if let Some(head_block) = head_block_receiver.borrow_and_update().clone() { - if head_block.number() >= block_needed { + if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { + if head_block_number >= block_needed { // the block we needed has arrived! break; } + // TODO: configurable lag per chain + if head_block_number < block_needed.saturating_sub(5.into()) { + // TODO: more detailed error about this being a far future block + return Err(Web3ProxyError::NoServersSynced); + } + } else { + // TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best + // yield_now().await; + return Err(Web3ProxyError::NoServersSynced); } } _ = sleep_until(expire_instant) => { @@ -1172,7 +1186,7 @@ impl Web3Rpc { } } - Ok(()) + Ok(clone) }; return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index b5150de6..3853431e 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -28,7 +28,8 @@ pub enum OpenRequestResult { RetryAt(Instant), /// The rpc are not synced, but they should be soon. /// You should wait for the given block number. - Lagged(Pin> + Send>>), + /// TODO: should this return an OpenRequestHandle? that might recurse + Lagged(Pin>> + Send>>), /// Unable to start a request because no servers are synced or the necessary data has been pruned NotReady, } @@ -340,7 +341,7 @@ impl OpenRequestHandle { Ok(jsonrpc::SingleResponse::Parsed(x)) => { matches!(&x.payload, Payload::Success { .. }) } - Ok(jsonrpc::SingleResponse::Stream(..)) => false, + Ok(jsonrpc::SingleResponse::Stream(..)) => true, Err(_) => false, }; diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index b3fd8a87..5d376fd3 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -137,7 +137,7 @@ impl StatBuffer { interval(Duration::from_secs(self.db_save_interval_seconds as u64)); // TODO: this should be a FlushedStats that we add to - let mut total_frontend_requests = 0; + let mut total_requests = 0; let mut tsdb_frontend_requests = 0; let mut tsdb_internal_requests = 0; let mut db_frontend_requests = 0; @@ -147,7 +147,7 @@ impl StatBuffer { select! { stat = stat_receiver.recv() => { if let Some(stat) = stat { - total_frontend_requests += self._buffer_app_stat(stat).await?; + total_requests += self._buffer_app_stat(stat).await?; // TODO: if buffers are big, flush now? } else { @@ -232,7 +232,7 @@ impl StatBuffer { db_internal_requests += flushed_stats.relational_internal_requests; // TODO: if these totals don't match, something is wrong! log something or maybe even return an error - info!(%total_frontend_requests, %tsdb_frontend_requests, %tsdb_internal_requests, %db_frontend_requests, %db_internal_requests, "accounting and stat save loop complete"); + info!(%total_requests, %tsdb_frontend_requests, %tsdb_internal_requests, %db_frontend_requests, %db_internal_requests, "accounting and stat save loop complete"); Ok(()) }