stream should count as success. lagged return the rpc

This commit is contained in:
Bryan Stitt 2023-10-07 15:18:55 -07:00
parent 4dc6f47f9a
commit 5a1a87a314
4 changed files with 74 additions and 15 deletions

@ -19,6 +19,8 @@ use serde::Serialize;
use std::cmp::{min_by_key, Ordering, Reverse};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::select;
use tokio::task::yield_now;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, enabled, info, trace, warn, Level};
@ -1041,11 +1043,11 @@ impl RpcsForRequest {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
continue;
}
Ok(OpenRequestResult::Lagged(..)) => {
Ok(OpenRequestResult::Lagged(x)) => {
trace!("{} is lagged. will not work now", best_rpc);
// this will probably always be the same block, right?
if wait_for_sync.is_none() {
wait_for_sync = Some(best_rpc);
wait_for_sync = Some(x);
}
continue;
}
@ -1086,14 +1088,56 @@ impl RpcsForRequest {
}
}
// TODO: if block_needed, do something with it here. not sure how best to subscribe
// if we got this far, no rpcs are ready
// clear earliest_retry_at if it is too far in the future to help us
if let Some(retry_at) = earliest_retry_at {
if self.request.expire_instant <= retry_at {
// no point in waiting. it wants us to wait too long
earliest_retry_at = None;
}
}
match (wait_for_sync, earliest_retry_at) {
(None, None) => {
// we have nothing to wait for. uh oh!
break;
}
sleep_until(retry_at).await;
} else {
break;
(None, Some(retry_at)) => {
// try again after rate limits are done
sleep_until(retry_at).await;
}
(Some(wait_for_sync), None) => {
select! {
x = wait_for_sync => {
match x {
Ok(rpc) => {
trace!(%rpc, "rpc ready. it might be used on the next loop");
// TODO: try a handle now?
continue;
},
Err(err) => {
trace!(?err, "problem while waiting for an rpc for a request");
break;
},
}
}
_ = sleep_until(self.request.expire_instant) => {
break;
}
}
}
(Some(wait_for_sync), Some(retry_at)) => {
select! {
x = wait_for_sync => {
todo!()
}
_ = sleep_until(retry_at) => {
yield_now().await;
continue;
}
}
}
}
}
}

@ -30,6 +30,7 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::select;
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::task::yield_now;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, trace, warn, Level};
use url::Url;
@ -1024,7 +1025,10 @@ impl Web3Rpc {
}
Ok(OpenRequestResult::Lagged(now_synced_f)) => {
select! {
_ = now_synced_f => {}
_ = now_synced_f => {
// TODO: i'm guessing this is returning immediatly
yield_now().await;
}
_ = sleep_until(web3_request.expire_instant) => {
break;
}
@ -1155,15 +1159,25 @@ impl Web3Rpc {
clone.head_block_sender.as_ref().unwrap().subscribe();
// TODO: if head_block is far behind block_needed, retrurn now
head_block_receiver.borrow_and_update();
loop {
select! {
_ = head_block_receiver.changed() => {
if let Some(head_block) = head_block_receiver.borrow_and_update().clone() {
if head_block.number() >= block_needed {
if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) {
if head_block_number >= block_needed {
// the block we needed has arrived!
break;
}
// TODO: configurable lag per chain
if head_block_number < block_needed.saturating_sub(5.into()) {
// TODO: more detailed error about this being a far future block
return Err(Web3ProxyError::NoServersSynced);
}
} else {
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
// yield_now().await;
return Err(Web3ProxyError::NoServersSynced);
}
}
_ = sleep_until(expire_instant) => {
@ -1172,7 +1186,7 @@ impl Web3Rpc {
}
}
Ok(())
Ok(clone)
};
return Ok(OpenRequestResult::Lagged(Box::pin(synced_f)));

@ -28,7 +28,8 @@ pub enum OpenRequestResult {
RetryAt(Instant),
/// The rpc are not synced, but they should be soon.
/// You should wait for the given block number.
Lagged(Pin<Box<dyn Future<Output = Web3ProxyResult<()>> + Send>>),
/// TODO: should this return an OpenRequestHandle? that might recurse
Lagged(Pin<Box<dyn Future<Output = Web3ProxyResult<Arc<Web3Rpc>>> + Send>>),
/// Unable to start a request because no servers are synced or the necessary data has been pruned
NotReady,
}
@ -340,7 +341,7 @@ impl OpenRequestHandle {
Ok(jsonrpc::SingleResponse::Parsed(x)) => {
matches!(&x.payload, Payload::Success { .. })
}
Ok(jsonrpc::SingleResponse::Stream(..)) => false,
Ok(jsonrpc::SingleResponse::Stream(..)) => true,
Err(_) => false,
};

@ -137,7 +137,7 @@ impl StatBuffer {
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
// TODO: this should be a FlushedStats that we add to
let mut total_frontend_requests = 0;
let mut total_requests = 0;
let mut tsdb_frontend_requests = 0;
let mut tsdb_internal_requests = 0;
let mut db_frontend_requests = 0;
@ -147,7 +147,7 @@ impl StatBuffer {
select! {
stat = stat_receiver.recv() => {
if let Some(stat) = stat {
total_frontend_requests += self._buffer_app_stat(stat).await?;
total_requests += self._buffer_app_stat(stat).await?;
// TODO: if buffers are big, flush now?
} else {
@ -232,7 +232,7 @@ impl StatBuffer {
db_internal_requests += flushed_stats.relational_internal_requests;
// TODO: if these totals don't match, something is wrong! log something or maybe even return an error
info!(%total_frontend_requests, %tsdb_frontend_requests, %tsdb_internal_requests, %db_frontend_requests, %db_internal_requests, "accounting and stat save loop complete");
info!(%total_requests, %tsdb_frontend_requests, %tsdb_internal_requests, %db_frontend_requests, %db_internal_requests, "accounting and stat save loop complete");
Ok(())
}