timeout and server selection fixes

add a short connect timeout separate from the overall request timeout.

also fix a bug when only 1 server was in the rpc list causing a very tight loop that made tokio sad
This commit is contained in:
Bryan Stitt 2023-10-09 22:21:39 -07:00
parent 4f6e75e109
commit 78edfee6b9
15 changed files with 257 additions and 172 deletions

2
.env

@ -1 +1,3 @@
DATABASE_URL=mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy
RUST_BACKTRACE=1
RUST_LOG=web3_proxy=debug,info

24
Cargo.lock generated

@ -4295,14 +4295,14 @@ dependencies = [
[[package]]
name = "regex"
version = "1.9.6"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff"
checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.3.9",
"regex-syntax 0.7.5",
"regex-automata 0.4.1",
"regex-syntax 0.8.0",
]
[[package]]
@ -4316,13 +4316,13 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.9"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9"
checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.5",
"regex-syntax 0.8.0",
]
[[package]]
@ -4337,6 +4337,12 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
[[package]]
name = "regex-syntax"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d"
[[package]]
name = "rend"
version = "0.4.1"
@ -4845,9 +4851,9 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.19"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0"
checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090"
dependencies = [
"serde",
]

@ -79,7 +79,7 @@ ordered-float = {version = "4.1.0" }
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "sync"] }
parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] }
rdkafka = { version = "0.34.0", features = ["tracing"] }
regex = "1.9.6"
regex = "1.10.0"
reqwest = { version = "0.11.22", default-features = false, features = ["json", "default-tls"] }
rust_decimal = { version = "1.32.0" }
sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "native-tls", "serde_json", "tracing"] }

@ -47,7 +47,7 @@ use std::time::Duration;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use tokio::time::{sleep, timeout, timeout_at, Instant};
use tracing::{error, info, trace, warn};
// TODO: make this customizable?
@ -1621,8 +1621,8 @@ impl Web3ProxyApp {
} else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key) {
// this is a cache_key that we know won't cache
// NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes
timeout(
web3_request.ttl(),
timeout_at(
web3_request.expire_at(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
web3_request,
@ -1642,8 +1642,8 @@ impl Web3ProxyApp {
let mut x = match timeout(Duration::from_secs(1), s.acquire_owned()).await {
Err(_) => {
// TODO: should we try to cache this? whatever has the semaphore //should// handle that for us
timeout(
web3_request.ttl(),
timeout_at(
web3_request.expire_at(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
web3_request,
@ -1661,10 +1661,8 @@ impl Web3ProxyApp {
app
.jsonrpc_response_cache
.try_get_with::<_, Web3ProxyError>(cache_key, async {
let duration = web3_request.ttl().saturating_sub(Duration::from_secs(1));
// TODO: dynamic timeout based on whats left on web3_request
let response_data = timeout(duration, app.balanced_rpcs
let response_data = timeout_at(web3_request.expire_at(), app.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
&web3_request,
)).await;
@ -1738,8 +1736,8 @@ impl Web3ProxyApp {
x
} else {
let mut x = timeout(
web3_request.ttl(),
let mut x = timeout_at(
web3_request.expire_at(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
web3_request,

@ -293,6 +293,8 @@ pub fn average_block_interval(chain_id: u64) -> Duration {
1101 => Duration::from_secs(7),
// base
8453 => Duration::from_secs(2),
// development
31337 => Duration::from_secs(10),
// arbitrum
42161 => Duration::from_millis(500),
// web3-proxy tests

@ -66,6 +66,12 @@ pub enum Web3ProxyError {
EthersHttpClient(ethers::providers::HttpClientError),
EthersProvider(ethers::prelude::ProviderError),
EthersWsClient(ethers::prelude::WsClientError),
#[display(fmt = "{} < {}", head, requested)]
#[from(ignore)]
FarFutureBlock {
head: U64,
requested: U64,
},
GasEstimateNotU256,
HdrRecord(hdrhistogram::errors::RecordError),
Headers(headers::Error),
@ -382,6 +388,20 @@ impl Web3ProxyError {
)
}
},
Self::FarFutureBlock { head, requested } => {
trace!(?head, ?requested, "FarFutureBlock");
(
StatusCode::OK,
JsonRpcErrorData {
message: "requested block is too far in the future".into(),
code: (-32002).into(),
data: Some(json!({
"head": head,
"requested": requested,
})),
},
)
}
// Self::JsonRpcForwardedError(x) => (StatusCode::OK, x),
Self::GasEstimateNotU256 => {
trace!("GasEstimateNotU256");
@ -656,7 +676,7 @@ impl Web3ProxyError {
JsonRpcErrorData {
message: "mdbx panic".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: Some(serde_json::Value::String(msg.to_string())),
data: Some(json!({"rpc": rpc})),
},
)
}

@ -164,8 +164,6 @@ pub struct Web3Request {
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
#[derivative(Default(value = "Instant::now()"))]
pub start_instant: Instant,
#[derivative(Default(value = "Instant::now() + Duration::from_secs(295)"))]
pub expire_instant: Instant,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
@ -193,6 +191,12 @@ pub struct Web3Request {
/// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
/// How long to spend waiting for an rpc that can serve this request
pub connect_timeout: Duration,
/// How long to spend waiting for an rpc to respond to this request
/// TODO: this should start once the connection is established
pub expire_timeout: Duration,
}
impl Display for Web3Request {
@ -340,10 +344,6 @@ impl Web3Request {
) -> Web3ProxyResult<Arc<Self>> {
let start_instant = Instant::now();
// TODO: get this default from config, or from user settings
// 5 minutes with a buffer for other things being slow
let expire_instant = start_instant + max_wait.unwrap_or_else(|| Duration::from_secs(295));
// let request: RequestOrMethod = request.into();
// we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key
@ -363,6 +363,9 @@ impl Web3Request {
_ => CacheMode::Never,
};
let connect_timeout = Duration::from_secs(3);
let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295));
let x = Self {
archive_request: false.into(),
authorization,
@ -370,7 +373,8 @@ impl Web3Request {
cache_mode,
chain_id,
error_response: false.into(),
expire_instant,
connect_timeout,
expire_timeout,
head_block: head_block.clone(),
kafka_debug_logger,
no_servers: 0.into(),
@ -488,6 +492,7 @@ impl Web3Request {
self.inner.id()
}
#[inline]
pub fn max_block_needed(&self) -> Option<U64> {
self.cache_mode.to_block().map(|x| *x.num())
}
@ -500,13 +505,27 @@ impl Web3Request {
}
}
pub fn ttl(&self) -> Duration {
self.expire_instant
.saturating_duration_since(Instant::now())
#[inline]
pub fn connect_timeout_at(&self) -> Instant {
// TODO: get from config
self.start_instant + Duration::from_secs(3)
}
pub fn ttl_expired(&self) -> bool {
self.expire_instant < Instant::now()
#[inline]
pub fn connect_timeout(&self) -> bool {
self.connect_timeout_at() <= Instant::now()
}
#[inline]
pub fn expire_at(&self) -> Instant {
// TODO: get from config
// erigon's timeout is 5 minutes so we want it shorter than that
self.start_instant + Duration::from_secs(295)
}
#[inline]
pub fn expired(&self) -> bool {
self.expire_at() <= Instant::now()
}
pub fn try_send_stat(mut self) -> Web3ProxyResult<()> {

@ -3,7 +3,6 @@ use axum::response::{IntoResponse, Response as AxumResponse};
use axum::Json;
use bytes::{Bytes, BytesMut};
use derive_more::From;
use ethers::providers::ProviderError;
use futures_util::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
@ -218,7 +217,7 @@ pub struct StreamResponse {
impl StreamResponse {
// TODO: error handing
pub async fn read<T>(self) -> Result<ParsedResponse<T>, ProviderError>
pub async fn read<T>(self) -> Web3ProxyResult<ParsedResponse<T>>
where
T: de::DeserializeOwned,
{
@ -306,7 +305,7 @@ where
}
// TODO: error handling
pub async fn parsed(self) -> Result<ParsedResponse<T>, ProviderError> {
pub async fn parsed(self) -> Web3ProxyResult<ParsedResponse<T>> {
match self {
Self::Parsed(resp) => Ok(resp),
Self::Stream(resp) => resp.read().await,
@ -362,7 +361,7 @@ pub enum Response<T = Arc<RawValue>> {
}
impl Response<Arc<RawValue>> {
pub async fn to_json_string(self) -> Result<String, ProviderError> {
pub async fn to_json_string(self) -> Web3ProxyResult<String> {
let x = match self {
Self::Single(resp) => {
// TODO: handle streaming differently?
@ -663,6 +662,11 @@ impl JsonRpcErrorData {
.expect("should always serialize")
.len()
}
pub fn is_retryable(&self) -> bool {
// TODO: move stuff from request to here
todo!()
}
}
impl From<&'static str> for JsonRpcErrorData {

@ -378,9 +378,8 @@ impl RankedRpcs {
}
}
// TODO: better name for this
// TODO: this should probably be on the rpcs as "can_serve_request"
// TODO: this should probably take the method, too
// TODO: and it should take the method into account, too
pub fn rpc_will_work_now(
&self,
min_block_needed: Option<U64>,
@ -423,6 +422,7 @@ impl RankedRpcs {
}
// TODO: refs for all of these. borrow on a Sender is cheap enough
// TODO: move this to many.rs
impl Web3Rpcs {
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_head_block
@ -1004,81 +1004,107 @@ fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web
}
impl RpcsForRequest {
/// TODO: uncomment the code that makes this wait for rpcs if it thinks they will be ready soon
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
// TODO: get error_handler out of the web3_request, probably the authorization
// let error_handler = web3_request.authorization.error_handler;
let error_handler = None;
stream! {
trace!("entered stream");
// TODO: get error_handler out of the web3_request? probably the authorization
// let error_handler = web3_request.authorization.error_handler;
let error_handler = None;
let max_len = self.inner.len() + self.outer.len();
// TODO: do this without having 3 Vecs
let mut filtered = Vec::with_capacity(max_len);
let mut attempted = Vec::with_capacity(max_len);
let mut failed = Vec::with_capacity(max_len);
// todo!("be sure to set server_error if we exit without any rpcs!");
loop {
if self.request.ttl_expired() {
if self.request.connect_timeout() {
break;
} else {
// TODO: think about this more
yield_now().await;
}
let mut earliest_retry_at = None;
let mut wait_for_sync = None;
// first check the inners
// TODO: DRY
for rpcs_iter in [self.inner.iter(), self.outer.iter()] {
for (rpc_a, rpc_b) in rpcs_iter.circular_tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
// first check the inners, then the outers
for rpcs in [&self.inner, &self.outer] {
match best_rpc
.try_request_handle(&self.request, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
attempted.clear();
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
continue;
}
Ok(OpenRequestResult::Lagged(x)) => {
trace!("{} is lagged. will not work now", best_rpc);
// this will probably always be the same block, right?
if wait_for_sync.is_none() {
wait_for_sync = Some(x);
while attempted.len() + failed.len() < rpcs.len() {
filtered.clear();
// TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues
filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || failed.contains(x))));
// tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself
if filtered.len() == 1 {
filtered.push(filtered[0]);
}
for (rpc_a, rpc_b) in filtered.iter().tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
attempted.push(best_rpc);
match best_rpc
.try_request_handle(&self.request, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
continue;
}
Ok(OpenRequestResult::Lagged(x)) => {
trace!("{} is lagged. will not work now", best_rpc);
// this will probably always be the same block, right?
if wait_for_sync.is_none() {
wait_for_sync = Some(x);
}
continue;
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
attempted.pop();
failed.push(best_rpc);
continue;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
attempted.pop();
failed.push(best_rpc);
continue;
}
continue;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
continue;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
}
}
debug_assert!(!attempted.is_empty());
}
}
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been
// maybe someone requested something silly like a far future block?
// clear earliest_retry_at if it is too far in the future to help us
if let Some(retry_at) = earliest_retry_at {
if self.request.expire_instant <= retry_at {
if self.request.connect_timeout_at() <= retry_at {
// no point in waiting. it wants us to wait too long
earliest_retry_at = None;
}
@ -1090,15 +1116,16 @@ impl RpcsForRequest {
// we have nothing to wait for. uh oh!
break;
}
(_, Some(retry_at)) => {
(None, Some(retry_at)) => {
// try again after rate limits are done
sleep_until(retry_at).await;
if retry_at > Instant::now() {
sleep_until(retry_at).await;
} else {
// TODO: why is this happening? why would we get rate limited to now? it should be like a second at minimum
yield_now().await;
}
}
(Some(wait_for_sync), None) => {
break;
// TODO: think about this more
/*
select! {
x = wait_for_sync => {
match x {
@ -1113,13 +1140,11 @@ impl RpcsForRequest {
},
}
}
_ = sleep_until(self.request.expire_instant) => {
_ = sleep_until(self.request.expire_at()) => {
break;
}
}
*/
}
/*
(Some(wait_for_sync), Some(retry_at)) => {
select! {
x = wait_for_sync => {
@ -1136,14 +1161,16 @@ impl RpcsForRequest {
}
}
_ = sleep_until(retry_at) => {
// if sleep didn't have to wait at all, something seems wrong. have a minimum wait?
yield_now().await;
continue;
}
}
}
*/
}
}
}
// TODO: log that no servers were available. this might not be a server error. the user might have requested something in the far future (common when people mix up chains)
}
}

@ -1,4 +0,0 @@
use super::{many::Web3Rpcs, one::Web3Rpc};
use ethers::providers::{JsonRpcClient, ProviderError};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

@ -408,17 +408,22 @@ impl Web3Rpcs {
Err(x) => return Err(x),
};
if next_try > web3_request.expire_instant {
let next_try = Instant::now().duration_since(next_try).as_secs();
if next_try > web3_request.connect_timeout_at() {
let retry_in = Instant::now().duration_since(next_try).as_secs();
// we don't use Web3ProxyError::RateLimited because that is for the user being rate limited
return Err(Web3ProxyError::StatusCode(
StatusCode::TOO_MANY_REQUESTS,
"backend rpcs are all rate limited!".into(),
Some(json!({"retry_at": next_try})),
Some(json!({"retry_in": retry_in})),
));
}
trace!(?next_try, "retry needed");
// todo!("this must be a bug in our tests. in prod if things are overloaded i could see it happening")
debug_assert!(Instant::now() < next_try);
select! {
_ = sleep_until(next_try) => {
// rpcs didn't change and we have waited too long. break to return an error
@ -587,7 +592,7 @@ impl Web3Rpcs {
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
Err(JsonRpcErrorData {
message: "Requested data is not available".into(),
code: -32043,
code: -32001,
data: Some(json!({
"request": web3_request
})),

@ -30,7 +30,6 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::select;
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::task::yield_now;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, trace, warn, Level};
use url::Url;
@ -353,6 +352,8 @@ impl Web3Rpc {
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1?
let mut last = U256::MAX;
// TODO: these should all be U256, not u64
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let head_block_num = self
.internal_request::<_, U256>(
@ -367,6 +368,13 @@ impl Web3Rpc {
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
if last == maybe_archive_block {
// we already checked it. exit early
break;
}
last = maybe_archive_block;
trace!(
"checking maybe_archive_block on {}: {}",
self,
@ -992,8 +1000,6 @@ impl Web3Rpc {
web3_request: &Arc<Web3Request>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestHandle> {
let mut head_block_sender = None;
loop {
match self.try_request_handle(web3_request, error_handler).await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
@ -1011,7 +1017,7 @@ impl Web3Rpc {
debug_assert!(wait > Duration::from_secs(0));
// TODO: have connect_timeout in addition to the full ttl
if retry_at > web3_request.expire_instant {
if retry_at > web3_request.expire_at() {
// break now since we will wait past our maximum wait time
return Err(Web3ProxyError::Timeout(Some(
web3_request.start_instant.elapsed(),
@ -1022,35 +1028,34 @@ impl Web3Rpc {
}
Ok(OpenRequestResult::Lagged(now_synced_f)) => {
select! {
_ = now_synced_f => {
// TODO: i'm guessing this is returning immediatly
yield_now().await;
}
_ = sleep_until(web3_request.expire_instant) => {
_ = now_synced_f => {}
_ = sleep_until(web3_request.expire_at()) => {
break;
}
}
}
Ok(OpenRequestResult::NotReady) => {
Ok(OpenRequestResult::Failed) => {
// TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self);
if head_block_sender.is_none() {
head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe());
}
// if head_block_sender.is_none() {
// head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe());
// }
if let Some(head_block_sender) = &mut head_block_sender {
select! {
_ = head_block_sender.changed() => {
head_block_sender.borrow_and_update();
}
_ = sleep_until(web3_request.expire_instant) => {
break;
}
}
} else {
break;
}
// if let Some(head_block_sender) = &mut head_block_sender {
// select! {
// _ = head_block_sender.changed() => {
// head_block_sender.borrow_and_update();
// }
// _ = sleep_until(web3_request.expire_at()) => {
// break;
// }
// }
// } else {
// break;
// }
break;
}
Err(err) => return Err(err),
}
@ -1140,7 +1145,8 @@ impl Web3Rpc {
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.min_block_needed() {
if !self.has_block_data(block_needed) {
return Ok(OpenRequestResult::NotReady);
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self);
return Ok(OpenRequestResult::Failed);
}
}
@ -1148,9 +1154,13 @@ impl Web3Rpc {
// TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check
if let Some(block_needed) = web3_request.max_block_needed() {
if !self.has_block_data(block_needed) {
let clone = self.clone();
let expire_instant = web3_request.expire_instant;
trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self);
let clone = self.clone();
let connect_timeout_at = web3_request.connect_timeout_at();
// create a future that resolves once this rpc can serve this request
// TODO: i don't love this future. think about it more
let synced_f = async move {
let mut head_block_receiver =
clone.head_block_sender.as_ref().unwrap().subscribe();
@ -1163,21 +1173,23 @@ impl Web3Rpc {
_ = head_block_receiver.changed() => {
if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) {
if head_block_number >= block_needed {
// the block we needed has arrived!
trace!("the block we needed has arrived!");
break;
}
// TODO: configurable lag per chain
if head_block_number < block_needed.saturating_sub(5.into()) {
// TODO: more detailed error about this being a far future block
return Err(Web3ProxyError::NoServersSynced);
// wait up to 2 blocks
// TODO: configurable wait per chain
if head_block_number + U64::from(2) < block_needed {
return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed });
}
} else {
// TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best
// yield_now().await;
error!("no head block during try_request_handle on {}", clone);
return Err(Web3ProxyError::NoServersSynced);
}
}
_ = sleep_until(expire_instant) => {
_ = sleep_until(connect_timeout_at) => {
error!("connection timeout on {}", clone);
return Err(Web3ProxyError::NoServersSynced);
}
}
@ -1198,7 +1210,7 @@ impl Web3Rpc {
}
RedisRateLimitResult::RetryNever => {
warn!("how did retry never on {} happen?", self);
return Ok(OpenRequestResult::NotReady);
return Ok(OpenRequestResult::Failed);
}
};

@ -31,7 +31,7 @@ pub enum OpenRequestResult {
/// TODO: should this return an OpenRequestHandle? that might recurse
Lagged(Pin<Box<dyn Future<Output = Web3ProxyResult<Arc<Web3Rpc>>> + Send>>),
/// Unable to start a request because no servers are synced or the necessary data has been pruned
NotReady,
Failed,
}
/// Make RPC requests through this handle and drop it when you are done.

@ -1,6 +1,5 @@
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};
use web3_proxy::app::BILLING_PERIOD_SECONDS;
use web3_proxy::config::TopConfig;
@ -214,7 +213,7 @@ impl MigrateStatsToV2SubCommand {
usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(),
cache_mode: Default::default(),
start_instant: Instant::now(),
expire_instant: Instant::now() + Duration::from_secs(1),
..Default::default()
};
web3_request.try_send_stat()?;

@ -1,10 +1,7 @@
use serde_json::Value;
use std::{str::FromStr, time::Duration};
use tokio::{
task::yield_now,
time::{sleep, Instant},
};
use tracing::info;
use tokio::{task::yield_now, time::sleep};
use tracing::{info, warn};
use web3_proxy::prelude::ethers::{
prelude::{Block, Transaction, TxHash, H256, U256, U64},
providers::{Http, JsonRpcClient, Quorum, QuorumProvider, WeightedProvider},
@ -39,6 +36,17 @@ async fn it_starts_and_stops() {
let anvil_provider = &a.provider;
let proxy_provider = &x.proxy_provider;
// check the /health page
let proxy_url = x.proxy_provider.url();
let health_response = reqwest::get(format!("{}health", proxy_url)).await;
dbg!(&health_response);
assert_eq!(health_response.unwrap().status(), StatusCode::OK);
// check the /status page
let status_response = reqwest::get(format!("{}status", proxy_url)).await;
dbg!(&status_response);
assert_eq!(status_response.unwrap().status(), StatusCode::OK);
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
@ -52,17 +60,6 @@ async fn it_starts_and_stops() {
assert_eq!(anvil_result, proxy_result);
// check the /health page
let proxy_url = x.proxy_provider.url();
let health_response = reqwest::get(format!("{}health", proxy_url)).await;
dbg!(&health_response);
assert_eq!(health_response.unwrap().status(), StatusCode::OK);
// check the /status page
let status_response = reqwest::get(format!("{}status", proxy_url)).await;
dbg!(&status_response);
assert_eq!(status_response.unwrap().status(), StatusCode::OK);
let first_block_num = anvil_result.number.unwrap();
// mine a block
@ -81,13 +78,9 @@ async fn it_starts_and_stops() {
yield_now().await;
let mut proxy_result;
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(1) {
panic!("took too long to sync!");
}
let mut proxy_result = None;
for _ in 0..10 {
proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
@ -99,7 +92,9 @@ async fn it_starts_and_stops() {
}
}
sleep(Duration::from_millis(10)).await;
warn!(?proxy_result, ?second_block_num);
sleep(Duration::from_millis(100)).await;
}
assert_eq!(anvil_result, proxy_result.unwrap());