improve responses when blocks are not available

This commit is contained in:
Bryan Stitt 2023-01-24 22:45:20 -08:00
parent 694e552b5d
commit cffc60e7f6
7 changed files with 131 additions and 55 deletions

@ -347,7 +347,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] `stat delay` script
- query database for newest stat
- [ ] period_datetime should always be :00. right now it depends on start time
- [ ] two servers running will confuse rpc_accounting!
- [ ] we have our hard rate limiter set up with a period of 60. but most providers have period of 1- [ ] two servers running will confuse rpc_accounting!
- it won't happen with users often because they should be sticky to one proxy, but unauthenticated users will definitely hit this
- one option: we need the insert to be an upsert, but how do we merge historgrams?
- [ ] don't use systemtime. use chrono

@ -1326,8 +1326,8 @@ impl Web3ProxyApp {
}
"eth_subscribe" => {
return Ok((
JsonRpcForwardedResponse::from_string(
format!("notifications not supported. eth_subscribe is only available over a websocket"),
JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_subscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
@ -1336,8 +1336,8 @@ impl Web3ProxyApp {
}
"eth_unsubscribe" => {
return Ok((
JsonRpcForwardedResponse::from_string(
format!("notifications not supported. eth_unsubscribe is only available over a websocket"),
JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_unsubscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),

@ -123,6 +123,7 @@ fn main() -> anyhow::Result<()> {
"web3_proxy=trace",
"web3_proxy_cli=trace",
"web3_proxy::rpcs::blockchain=info",
"web3_proxy::rpcs::request=debug",
]
}
_ => {

@ -149,7 +149,7 @@ impl Web3Connections {
// TODO: if error, retry?
let block: ArcBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Duration::from_secs(30), false)
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), false)
.await?
.request::<_, Option<_>>(
"eth_getBlockByHash",
@ -253,11 +253,16 @@ impl Web3Connections {
// TODO: if error, retry?
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num))
.try_send_best_consensus_head_connection(authorization, request, None, None)
.await?;
let raw_block = response.result.context("no block result")?;
if let Some(err) = response.error {
debug!("could not find canonical block {}: {:?}", num, err);
}
let raw_block = response.result.context("no cannonical block result")?;
let block: ArcBlock = serde_json::from_str(raw_block.get())?;

@ -213,7 +213,7 @@ impl Web3Connection {
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let handle = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.wait_for_request_handle(authorization, None, true)
.await?;
let head_block_num_future = handle.request::<Option<()>, U256>(
@ -239,7 +239,7 @@ impl Web3Connection {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let handle = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.wait_for_request_handle(authorization, None, true)
.await?;
let archive_result: Result<Bytes, _> = handle
@ -436,7 +436,7 @@ impl Web3Connection {
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), true)
.wait_for_request_handle(&authorization, None, true)
.await?
.request(
"eth_chainId",
@ -720,7 +720,7 @@ impl Web3Connection {
loop {
// TODO: what should the max_wait be?
match self
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.wait_for_request_handle(&authorization, None, false)
.await
{
Ok(active_request_handle) => {
@ -806,7 +806,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.wait_for_request_handle(&authorization, None, false)
.await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
@ -816,7 +816,7 @@ impl Web3Connection {
// all it does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.wait_for_request_handle(&authorization, None, false)
.await?
.request(
"eth_getBlockByNumber",
@ -917,8 +917,8 @@ impl Web3Connection {
Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.await;
.wait_for_request_handle(&authorization, None, false)
.await?;
let mut stream = provider.subscribe_pending_txs().await?;
@ -955,10 +955,10 @@ impl Web3Connection {
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
max_wait: Duration,
max_wait: Option<Duration>,
allow_not_ready: bool,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait;
let max_wait = max_wait.map(|x| Instant::now() + x);
loop {
match self
@ -968,24 +968,34 @@ impl Web3Connection {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
trace!("{} waiting for request handle until {:?}", self, retry_at);
let wait = retry_at.duration_since(Instant::now());
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
// TODO: don't use anyhow. use specific error type
return Err(anyhow::anyhow!("timeout waiting for request handle"));
trace!(
"waiting {} millis for request handle on {}",
wait.as_millis(),
self
);
if let Some(max_wait) = max_wait {
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
// TODO: don't use anyhow. use specific error type
return Err(anyhow::anyhow!("timeout waiting for request handle"));
}
}
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::NotReady) => {
Ok(OpenRequestResult::NotReady(_)) => {
// TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self);
let now = Instant::now();
if let Some(max_wait) = max_wait {
let now = Instant::now();
if now > max_wait {
return Err(anyhow::anyhow!("unable to retry for request handle"));
if now > max_wait {
return Err(anyhow::anyhow!("unable to retry for request handle"));
}
}
// TODO: sleep how long? maybe just error?
@ -1013,7 +1023,8 @@ impl Web3Connection {
.await
.is_none()
{
return Ok(OpenRequestResult::NotReady);
trace!("{} is not ready", self);
return Ok(OpenRequestResult::NotReady(self.backup));
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
@ -1029,25 +1040,33 @@ impl Web3Connection {
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
// TODO: how should we know if we should set expire or not?
match ratelimiter.throttle().await? {
match ratelimiter
.throttle()
.await
.context(format!("attempting to throttle {}", self))?
{
RedisRateLimitResult::Allowed(_) => {
// trace!("rate limit succeeded")
}
RedisRateLimitResult::RetryAt(retry_at, _) => {
// rate limit failed
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
// TODO: use tracing better
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
warn!("Exhausted rate limit on {}. Retry at {:?}", self, retry_at);
// rate limit gave us a wait time
if !self.backup {
let when = retry_at.duration_since(Instant::now());
warn!(
"Exhausted rate limit on {}. Retry in {}ms",
self,
when.as_millis()
);
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
hard_limit_until.send(retry_at.clone())?;
hard_limit_until.send_replace(retry_at.clone());
}
return Ok(OpenRequestResult::RetryAt(retry_at));
}
RedisRateLimitResult::RetryNever => {
return Ok(OpenRequestResult::NotReady);
return Ok(OpenRequestResult::NotReady(self.backup));
}
}
};

@ -428,7 +428,10 @@ impl Web3Connections {
)
.await
{
return Ok(without_backups);
// TODO: this might use backups too eagerly. but even when we allow backups, we still prioritize our own
if matches!(without_backups, OpenRequestResult::Handle(_)) {
return Ok(without_backups);
}
}
self._best_consensus_head_connection(
@ -460,7 +463,7 @@ impl Web3Connections {
head_block.number()
} else {
// TODO: optionally wait for a head block >= min_block_needed
return Ok(OpenRequestResult::NotReady);
return Ok(OpenRequestResult::NotReady(allow_backups));
};
let min_block_needed = min_block_needed.unwrap_or(&head_block_num);
@ -504,7 +507,7 @@ impl Web3Connections {
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady);
return Ok(OpenRequestResult::NotReady(allow_backups));
}
}
@ -595,7 +598,7 @@ impl Web3Connections {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::NotReady) => {
Ok(OpenRequestResult::NotReady(_)) => {
// TODO: log a warning? emit a stat?
}
Err(err) => {
@ -625,7 +628,7 @@ impl Web3Connections {
// TODO: should we log here?
Ok(OpenRequestResult::NotReady)
Ok(OpenRequestResult::NotReady(allow_backups))
}
Some(earliest_retry_at) => {
warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
@ -719,7 +722,7 @@ impl Web3Connections {
max_count -= 1;
selected_rpcs.push(handle)
}
Ok(OpenRequestResult::NotReady) => {
Ok(OpenRequestResult::NotReady(_)) => {
warn!("no request handle for {}", connection)
}
Err(err) => {
@ -911,17 +914,51 @@ impl Web3Connections {
}
}
}
OpenRequestResult::NotReady => {
OpenRequestResult::NotReady(backups_included) => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
trace!("No servers ready. Waiting up to 1 second for change in synced servers");
// todo!(
// "check if we are requesting an old block and no archive servers are synced"
// );
if let Some(min_block_needed) = min_block_needed {
let mut theres_a_chance = false;
for potential_conn in self.conns.values() {
if skip_rpcs.contains(potential_conn) {
continue;
}
// TODO: should we instead check if has_block_data but with the current head block?
if potential_conn.has_block_data(min_block_needed) {
trace!("chance for {} on {}", min_block_needed, potential_conn);
theres_a_chance = true;
break;
}
skip_rpcs.push(potential_conn.clone());
}
if !theres_a_chance {
debug!("no chance of finding data in block #{}", min_block_needed);
break;
}
}
if backups_included {
// if NotReady and we tried backups, there's no chance
warn!("No servers ready even after checking backups");
break;
}
debug!("No servers ready. Waiting up to 1 second for change in synced servers");
// TODO: exponential backoff?
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
skip_rpcs.pop();
// do NOT pop the last rpc off skip here
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
@ -944,17 +981,30 @@ impl Web3Connections {
}
let num_conns = self.conns.len();
let num_skipped = skip_rpcs.len();
if skip_rpcs.is_empty() {
if num_skipped == 0 {
error!("No servers synced ({} known)", num_conns);
Err(anyhow::anyhow!("No servers synced ({} known)", num_conns))
return Ok(JsonRpcForwardedResponse::from_str(
"No servers synced",
Some(-32000),
Some(request.id),
));
} else {
Err(anyhow::anyhow!(
"{}/{} servers erred",
skip_rpcs.len(),
num_conns
))
// TODO: warn? debug? trace?
warn!(
"Requested data was not available on {}/{} servers",
num_skipped, num_conns
);
// TODO: what error code?
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
return Ok(JsonRpcForwardedResponse::from_str(
"Requested data is not available",
Some(-32043),
Some(request.id),
));
}
}
@ -1287,7 +1337,7 @@ mod tests {
dbg!(&x);
assert!(matches!(x, OpenRequestResult::NotReady));
assert!(matches!(x, OpenRequestResult::NotReady(true)));
// add lagged blocks to the conns. both servers should be allowed
lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap();
@ -1360,7 +1410,7 @@ mod tests {
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()))
.await,
Ok(OpenRequestResult::NotReady)
Ok(OpenRequestResult::NotReady(true))
));
}

@ -27,7 +27,8 @@ pub enum OpenRequestResult {
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request because the server is not synced
NotReady,
/// contains "true" if backup servers were attempted
NotReady(bool),
}
/// Make RPC requests through this handle and drop it when you are done.