refactor rpc selection
This commit is contained in:
parent
aee8ede8ef
commit
9258d36e6a
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -6642,6 +6642,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"argh",
|
||||
"async-recursion",
|
||||
"async-stream",
|
||||
"async-stripe",
|
||||
"async-trait",
|
||||
"axum",
|
||||
|
@ -102,6 +102,7 @@ test-log = { version = "0.2.12", default-features = false, features = ["trace"]
|
||||
bytes = "1.5.0"
|
||||
futures-util = "0.3.28"
|
||||
async-recursion = "1.0.5"
|
||||
async-stream = "0.3.5"
|
||||
|
||||
# # TODO: bring this back
|
||||
# check-if-email-exists = "0.9.0"
|
||||
|
@ -42,14 +42,14 @@ use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
use std::num::NonZeroU64;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::sync::atomic::{self, AtomicU16, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::select;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{sleep, timeout, Instant};
|
||||
use tracing::{error, info, trace, warn, Level};
|
||||
use tracing::{error, info, trace, warn};
|
||||
|
||||
// TODO: make this customizable?
|
||||
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
|
||||
@ -1106,28 +1106,31 @@ impl Web3ProxyApp {
|
||||
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
|
||||
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
|
||||
ProxyMode::Fastest(x) => Some(x * 4),
|
||||
ProxyMode::Quorum(x, ..) => Some(x),
|
||||
ProxyMode::Versus => None,
|
||||
};
|
||||
|
||||
// no private rpcs to send to. send to a few public rpcs
|
||||
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
|
||||
self.balanced_rpcs
|
||||
.try_send_all_synced_connections(
|
||||
web3_request,
|
||||
Some(Duration::from_secs(10)),
|
||||
Some(Level::TRACE.into()),
|
||||
num_public_rpcs,
|
||||
)
|
||||
.await
|
||||
// // no private rpcs to send to. send to a few public rpcs
|
||||
// // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
|
||||
// self.balanced_rpcs
|
||||
// .try_send_all_synced_connections(
|
||||
// web3_request,
|
||||
// Some(Duration::from_secs(10)),
|
||||
// Some(Level::TRACE.into()),
|
||||
// num_public_rpcs,
|
||||
// )
|
||||
// .await
|
||||
todo!();
|
||||
} else {
|
||||
self.protected_rpcs
|
||||
.try_send_all_synced_connections(
|
||||
web3_request,
|
||||
Some(Duration::from_secs(10)),
|
||||
Some(Level::TRACE.into()),
|
||||
Some(3),
|
||||
)
|
||||
.await
|
||||
// self.protected_rpcs
|
||||
// .try_send_all_synced_connections(
|
||||
// web3_request,
|
||||
// Some(Duration::from_secs(10)),
|
||||
// Some(Level::TRACE.into()),
|
||||
// Some(3),
|
||||
// )
|
||||
// .await
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1154,51 +1157,52 @@ impl Web3ProxyApp {
|
||||
|
||||
tries += 1;
|
||||
|
||||
let (code, response) = match self._proxy_request_with_caching(&web3_request).await {
|
||||
Ok(response_data) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
let (code, response) =
|
||||
match self._proxy_request_with_caching(web3_request.clone()).await {
|
||||
Ok(response_data) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
|
||||
(StatusCode::OK, response_data)
|
||||
}
|
||||
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
|
||||
err.as_json_response_parts(web3_request.id())
|
||||
}
|
||||
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(response_data.is_error(), Ordering::Relaxed);
|
||||
|
||||
let response = jsonrpc::ParsedResponse::from_response_data(
|
||||
response_data,
|
||||
web3_request.id(),
|
||||
);
|
||||
(StatusCode::OK, response.into())
|
||||
}
|
||||
Err(err) => {
|
||||
if tries <= max_tries {
|
||||
// TODO: log the error before retrying
|
||||
continue;
|
||||
(StatusCode::OK, response_data)
|
||||
}
|
||||
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
|
||||
// max tries exceeded. return the error
|
||||
err.as_json_response_parts(web3_request.id())
|
||||
}
|
||||
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(response_data.is_error(), Ordering::Relaxed);
|
||||
|
||||
web3_request.error_response.store(true, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
let response = jsonrpc::ParsedResponse::from_response_data(
|
||||
response_data,
|
||||
web3_request.id(),
|
||||
);
|
||||
(StatusCode::OK, response.into())
|
||||
}
|
||||
Err(err) => {
|
||||
if tries <= max_tries {
|
||||
// TODO: log the error before retrying
|
||||
continue;
|
||||
}
|
||||
|
||||
err.as_json_response_parts(web3_request.id())
|
||||
}
|
||||
};
|
||||
// max tries exceeded. return the error
|
||||
|
||||
web3_request.error_response.store(true, Ordering::Relaxed);
|
||||
web3_request
|
||||
.user_error_response
|
||||
.store(false, Ordering::Relaxed);
|
||||
|
||||
err.as_json_response_parts(web3_request.id())
|
||||
}
|
||||
};
|
||||
|
||||
web3_request.add_response(&response);
|
||||
|
||||
@ -1215,7 +1219,7 @@ impl Web3ProxyApp {
|
||||
/// TODO: how can we make this generic?
|
||||
async fn _proxy_request_with_caching(
|
||||
self: &Arc<Self>,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
web3_request: Arc<Web3Request>,
|
||||
) -> Web3ProxyResult<jsonrpc::SingleResponse> {
|
||||
// TODO: serve net_version without querying the backend
|
||||
// TODO: don't force RawValue
|
||||
@ -1334,7 +1338,7 @@ impl Web3ProxyApp {
|
||||
let mut gas_estimate = self
|
||||
.balanced_rpcs
|
||||
.try_proxy_connection::<U256>(
|
||||
web3_request,
|
||||
web3_request.clone(),
|
||||
)
|
||||
.await?
|
||||
.parsed()
|
||||
@ -1370,7 +1374,7 @@ impl Web3ProxyApp {
|
||||
let mut result = self
|
||||
.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
web3_request,
|
||||
web3_request.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@ -1429,7 +1433,7 @@ impl Web3ProxyApp {
|
||||
|
||||
let response = self
|
||||
.try_send_protected(
|
||||
web3_request,
|
||||
&web3_request,
|
||||
).await;
|
||||
|
||||
let mut response = response.try_into()?;
|
||||
@ -1633,7 +1637,7 @@ impl Web3ProxyApp {
|
||||
web3_request.ttl(),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
&web3_request,
|
||||
web3_request,
|
||||
)
|
||||
).await??
|
||||
} else {
|
||||
@ -1654,7 +1658,7 @@ impl Web3ProxyApp {
|
||||
web3_request.ttl(),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
&web3_request,
|
||||
web3_request.clone(),
|
||||
)
|
||||
).await??
|
||||
}
|
||||
@ -1674,7 +1678,7 @@ impl Web3ProxyApp {
|
||||
// TODO: dynamic timeout based on whats left on web3_request
|
||||
let response_data = timeout(duration, app.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
&web3_request,
|
||||
web3_request.clone(),
|
||||
)).await;
|
||||
|
||||
match response_data {
|
||||
@ -1750,7 +1754,7 @@ impl Web3ProxyApp {
|
||||
web3_request.ttl(),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<Arc<RawValue>>(
|
||||
&web3_request,
|
||||
web3_request.clone(),
|
||||
)
|
||||
).await??;
|
||||
|
||||
|
@ -40,11 +40,13 @@ use tracing::trace;
|
||||
/// How to select backend servers for a request
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
pub enum ProxyMode {
|
||||
/// send to the "best" synced server
|
||||
/// send to the "best" synced server. on error, try the next
|
||||
#[default]
|
||||
Best,
|
||||
/// send to all synced servers and return the fastest non-error response (reverts do not count as errors here)
|
||||
Fastest(usize),
|
||||
/// send to k servers and return the best response common between at least n servers
|
||||
Quorum(usize, usize),
|
||||
/// send to all servers for benchmarking. return the fastest non-error response
|
||||
Versus,
|
||||
/// send all requests and responses to kafka
|
||||
|
@ -209,6 +209,7 @@ pub enum Payload<T> {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StreamResponse {
|
||||
// TODO: phantom T on here?
|
||||
buffer: Bytes,
|
||||
response: reqwest::Response,
|
||||
web3_request: Arc<Web3Request>,
|
||||
@ -246,6 +247,7 @@ impl IntoResponse for StreamResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SingleResponse<T = Arc<RawValue>> {
|
||||
/// TODO: save the size here so we don't have to serialize again
|
||||
Parsed(ParsedResponse<T>),
|
||||
Stream(StreamResponse),
|
||||
}
|
||||
|
@ -1,20 +1,25 @@
|
||||
use super::blockchain::Web3ProxyBlock;
|
||||
use super::many::Web3Rpcs;
|
||||
use super::one::Web3Rpc;
|
||||
use super::request::OpenRequestHandle;
|
||||
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
||||
use crate::frontend::authorization::Web3Request;
|
||||
use crate::rpcs::request::OpenRequestResult;
|
||||
use async_stream::stream;
|
||||
use base64::engine::general_purpose;
|
||||
use derive_more::Constructor;
|
||||
use ethers::prelude::{H256, U64};
|
||||
use futures_util::Stream;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
|
||||
use hdrhistogram::Histogram;
|
||||
use itertools::{Itertools, MinMaxResult};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use std::cmp::{Ordering, Reverse};
|
||||
use std::cmp::{min_by_key, Ordering, Reverse};
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::{sleep_until, Instant};
|
||||
use tracing::{debug, enabled, info, trace, warn, Level};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
@ -81,6 +86,7 @@ impl PartialOrd for RpcRanking {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: i think we can get rid of this in favor of
|
||||
pub enum ShouldWaitForBlock {
|
||||
Ready,
|
||||
// BackupReady,
|
||||
@ -107,7 +113,34 @@ pub struct RankedRpcs {
|
||||
rpc_data: HashMap<Arc<Web3Rpc>, ConsensusRpcData>,
|
||||
}
|
||||
|
||||
pub struct RpcsForRequest {
|
||||
inner: Vec<Arc<Web3Rpc>>,
|
||||
outer: Vec<Arc<Web3Rpc>>,
|
||||
request: Arc<Web3Request>,
|
||||
}
|
||||
|
||||
impl RankedRpcs {
|
||||
pub fn from_rpcs(rpcs: Vec<Arc<Web3Rpc>>, head_block: Web3ProxyBlock) -> Option<Self> {
|
||||
// we don't need to sort the rpcs now. we will sort them when a request neds them
|
||||
// TODO: the shame about this is that we lose just being able to compare 2 random servers
|
||||
|
||||
let backups_needed = rpcs.iter().any(|x| x.backup);
|
||||
|
||||
let num_synced = rpcs.len();
|
||||
|
||||
let rpc_data = Default::default();
|
||||
|
||||
let ranked_rpcs = RankedRpcs {
|
||||
backups_needed,
|
||||
head_block,
|
||||
inner: rpcs,
|
||||
num_synced,
|
||||
rpc_data,
|
||||
};
|
||||
|
||||
Some(ranked_rpcs)
|
||||
}
|
||||
|
||||
pub fn from_votes(
|
||||
min_synced_rpcs: usize,
|
||||
min_sum_soft_limit: u32,
|
||||
@ -186,6 +219,42 @@ impl RankedRpcs {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn for_request(&self, web3_request: Arc<Web3Request>) -> Option<RpcsForRequest> {
|
||||
if self.num_active_rpcs() == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// these are bigger than we need, but how much does that matter?
|
||||
let mut inner = Vec::with_capacity(self.num_active_rpcs());
|
||||
let mut outer = Vec::with_capacity(self.num_active_rpcs());
|
||||
|
||||
// TODO: what if min is set to some future block?
|
||||
let max_block_needed = web3_request
|
||||
.max_block_needed()
|
||||
.or_else(|| web3_request.head_block.as_ref().map(|x| x.number()));
|
||||
let min_block_needed = web3_request.min_block_needed();
|
||||
|
||||
for rpc in &self.inner {
|
||||
match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) {
|
||||
ShouldWaitForBlock::NeverReady => continue,
|
||||
ShouldWaitForBlock::Ready => inner.push(rpc.clone()),
|
||||
ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
let mut rng = nanorand::tls_rng();
|
||||
|
||||
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
|
||||
inner.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed));
|
||||
outer.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed));
|
||||
|
||||
Some(RpcsForRequest {
|
||||
inner,
|
||||
outer,
|
||||
request: web3_request,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn all(&self) -> &[Arc<Web3Rpc>] {
|
||||
&self.inner
|
||||
}
|
||||
@ -200,27 +269,6 @@ impl RankedRpcs {
|
||||
self.inner.len()
|
||||
}
|
||||
|
||||
/// will tell you if waiting will eventually should wait for a block
|
||||
/// TODO: error if backup will be needed to serve the request?
|
||||
/// TODO: serve now if a backup server has the data?
|
||||
/// TODO: also include method (or maybe an enum representing the different prune types)
|
||||
pub fn should_wait_for_block(
|
||||
&self,
|
||||
min_block_num: Option<U64>,
|
||||
max_block_num: Option<U64>,
|
||||
skip_rpcs: &[Arc<Web3Rpc>],
|
||||
) -> ShouldWaitForBlock {
|
||||
for rpc in self.inner.iter() {
|
||||
match self.rpc_will_work_eventually(rpc, min_block_num, max_block_num, skip_rpcs) {
|
||||
ShouldWaitForBlock::NeverReady => continue,
|
||||
x => return x,
|
||||
}
|
||||
}
|
||||
|
||||
ShouldWaitForBlock::NeverReady
|
||||
}
|
||||
|
||||
/// TODO: change this to take a min and a max
|
||||
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: U64) -> bool {
|
||||
self.rpc_data
|
||||
.get(rpc)
|
||||
@ -228,21 +276,13 @@ impl RankedRpcs {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on)
|
||||
// TODO: move this onto Web3Rpc?
|
||||
// TODO: this needs min and max block on it
|
||||
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on error mark them as not supporting it)
|
||||
pub fn rpc_will_work_eventually(
|
||||
&self,
|
||||
rpc: &Arc<Web3Rpc>,
|
||||
min_block_num: Option<U64>,
|
||||
max_block_num: Option<U64>,
|
||||
skip_rpcs: &[Arc<Web3Rpc>],
|
||||
) -> ShouldWaitForBlock {
|
||||
if skip_rpcs.contains(rpc) {
|
||||
// if rpc is skipped, it must have already been determined it is unable to serve the request
|
||||
return ShouldWaitForBlock::NeverReady;
|
||||
}
|
||||
|
||||
if let Some(min_block_num) = min_block_num {
|
||||
if !self.has_block_data(rpc, min_block_num) {
|
||||
trace!(
|
||||
@ -289,16 +329,10 @@ impl RankedRpcs {
|
||||
// TODO: this should probably take the method, too
|
||||
pub fn rpc_will_work_now(
|
||||
&self,
|
||||
skip: &[Arc<Web3Rpc>],
|
||||
min_block_needed: Option<U64>,
|
||||
max_block_needed: Option<U64>,
|
||||
rpc: &Arc<Web3Rpc>,
|
||||
) -> bool {
|
||||
if skip.contains(rpc) {
|
||||
trace!("skipping {}", rpc);
|
||||
return false;
|
||||
}
|
||||
|
||||
if rpc.backup && !self.backups_needed {
|
||||
// skip backups unless backups are needed for ranked_rpcs to exist
|
||||
return false;
|
||||
@ -326,13 +360,14 @@ impl RankedRpcs {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this might be a big perf hit. benchmark
|
||||
if let Some(x) = rpc.hard_limit_until.as_ref() {
|
||||
if *x.borrow() > Instant::now() {
|
||||
trace!("{} is rate limited. will not work now", rpc,);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// TODO: i think its better to do rate limits later anyways. think more about it though
|
||||
// // TODO: this might be a big perf hit. benchmark
|
||||
// if let Some(x) = rpc.hard_limit_until.as_ref() {
|
||||
// if *x.borrow() > Instant::now() {
|
||||
// trace!("{} is rate limited. will not work now", rpc,);
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
|
||||
true
|
||||
}
|
||||
@ -914,3 +949,65 @@ impl ConsensusFinder {
|
||||
.max()
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcsForRequest {
|
||||
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
|
||||
// TODO: get error_handler out of the web3_request, probably the authorization
|
||||
// let error_handler = web3_request.authorization.error_handler;
|
||||
let error_handler = None;
|
||||
|
||||
stream! {
|
||||
loop {
|
||||
let mut earliest_retry_at = None;
|
||||
|
||||
for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() {
|
||||
trace!("{} vs {}", rpc_a, rpc_b);
|
||||
// TODO: ties within X% to the server with the smallest block_data_limit?
|
||||
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
|
||||
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available()), x.backup, x.weighted_peak_latency()));
|
||||
trace!("winner: {}", faster_rpc);
|
||||
|
||||
match faster_rpc
|
||||
.try_request_handle(&self.request, error_handler)
|
||||
.await
|
||||
{
|
||||
Ok(OpenRequestResult::Handle(handle)) => {
|
||||
trace!("opened handle: {}", faster_rpc);
|
||||
yield handle;
|
||||
}
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
trace!(
|
||||
"retry on {} @ {}",
|
||||
faster_rpc,
|
||||
retry_at.duration_since(Instant::now()).as_secs_f32()
|
||||
);
|
||||
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
continue;
|
||||
}
|
||||
Ok(OpenRequestResult::NotReady) => {
|
||||
// TODO: log a warning? emit a stat?
|
||||
trace!("best_rpc not ready: {}", faster_rpc);
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("No request handle for {}. err={:?}", faster_rpc, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check self.outer
|
||||
|
||||
if let Some(retry_at) = earliest_retry_at {
|
||||
if self.request.expire_instant <= retry_at {
|
||||
break;
|
||||
}
|
||||
sleep_until(retry_at).await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! Load balanced communication with a group of web3 rpc providers
|
||||
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
|
||||
use super::consensus::{RankedRpcs, ShouldWaitForBlock};
|
||||
use super::consensus::{RankedRpcs, RpcsForRequest, ShouldWaitForBlock};
|
||||
use super::one::Web3Rpc;
|
||||
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
|
||||
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
|
||||
@ -27,10 +27,10 @@ use std::cmp::min_by_key;
|
||||
use std::fmt::{self, Display};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use tokio::select;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::task::yield_now;
|
||||
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
||||
use tokio::{pin, select};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
||||
@ -69,6 +69,34 @@ pub struct Web3Rpcs {
|
||||
pub(super) pending_txid_firehose_sender: Option<mpsc::Sender<TxHash>>,
|
||||
}
|
||||
|
||||
/// this is a RankedRpcs that should be ready to use
|
||||
/// there is a small chance of waiting for rate limiting depending on how many backend servers you have
|
||||
#[derive(From)]
|
||||
pub enum TryRpcsForRequest {
|
||||
Some(RpcsForRequest),
|
||||
RetryAt(Instant),
|
||||
// WaitForBlock(U64),
|
||||
None,
|
||||
}
|
||||
|
||||
impl From<Option<Instant>> for TryRpcsForRequest {
|
||||
fn from(value: Option<Instant>) -> Self {
|
||||
match value {
|
||||
None => Self::None,
|
||||
Some(x) => Self::RetryAt(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<RpcsForRequest>> for TryRpcsForRequest {
|
||||
fn from(value: Option<RpcsForRequest>) -> Self {
|
||||
match value {
|
||||
None => Self::None,
|
||||
Some(x) => Self::Some(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3Rpcs {
|
||||
/// Spawn durable connections to multiple Web3 providers.
|
||||
pub async fn spawn(
|
||||
@ -368,370 +396,32 @@ impl Web3Rpcs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send the same request to all the handles. Returning the most common success or most common error.
|
||||
/// TODO: option to return the fastest (non-null, non-error) response and handles for all the others instead?
|
||||
pub async fn try_send_parallel_requests<R: JsonRpcResultData>(
|
||||
&self,
|
||||
active_request_handles: Vec<OpenRequestHandle>,
|
||||
max_wait: Option<Duration>,
|
||||
) -> Result<R, Web3ProxyError> {
|
||||
// TODO: if only 1 active_request_handles, do self.try_send_request?
|
||||
|
||||
let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300));
|
||||
|
||||
// TODO: iter stream
|
||||
let responses = active_request_handles
|
||||
.into_iter()
|
||||
.map(|active_request_handle| async move {
|
||||
let result: Result<Result<R, Web3ProxyError>, Web3ProxyError> =
|
||||
timeout(max_wait, async {
|
||||
match active_request_handle.request().await {
|
||||
Ok(response) => match response.parsed().await {
|
||||
Ok(parsed) => parsed.into_result(),
|
||||
Err(err) => Err(Web3ProxyError::EthersProvider(err)),
|
||||
},
|
||||
Err(err) => Err(Web3ProxyError::EthersProvider(err)),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(Web3ProxyError::from);
|
||||
|
||||
result.flatten()
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.collect::<Vec<Result<R, Web3ProxyError>>>()
|
||||
.await;
|
||||
|
||||
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
|
||||
let mut count_map: HashMap<String, _> = HashMap::new();
|
||||
let mut counts: Counter<String> = Counter::new();
|
||||
let mut any_ok_with_json_result = false;
|
||||
for partial_response in responses {
|
||||
if partial_response.is_ok() {
|
||||
any_ok_with_json_result = true;
|
||||
}
|
||||
|
||||
// TODO: better key!
|
||||
let s = format!("{:?}", partial_response);
|
||||
|
||||
if count_map.get(&s).is_none() {
|
||||
count_map.insert(s.clone(), partial_response);
|
||||
}
|
||||
|
||||
counts.update([s]);
|
||||
}
|
||||
|
||||
// return the most_common success if any. otherwise return the most_common error
|
||||
for (most_common, _) in counts.most_common_ordered() {
|
||||
let most_common = count_map
|
||||
.remove(&most_common)
|
||||
.expect("most_common key must exist");
|
||||
|
||||
match most_common {
|
||||
Ok(x) => {
|
||||
// return the most common success
|
||||
return Ok(x);
|
||||
}
|
||||
Err(err) => {
|
||||
if any_ok_with_json_result {
|
||||
// the most common is an error, but there is an Ok in here somewhere. continue the loop to find it
|
||||
continue;
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: what should we do if we get here? i don't think we will
|
||||
unimplemented!("this shouldn't be possible")
|
||||
}
|
||||
|
||||
async fn _best_available_rpc(
|
||||
&self,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
error_handler: Option<RequestErrorHandler>,
|
||||
potential_rpcs: &[Arc<Web3Rpc>],
|
||||
skip: &mut Vec<Arc<Web3Rpc>>,
|
||||
) -> OpenRequestResult {
|
||||
let mut earliest_retry_at = None;
|
||||
|
||||
for (rpc_a, rpc_b) in potential_rpcs.iter().circular_tuple_windows() {
|
||||
trace!("{} vs {}", rpc_a, rpc_b);
|
||||
// TODO: ties within X% to the server with the smallest block_data_limit
|
||||
// faster rpc. backups always lose.
|
||||
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (x.backup, x.weighted_peak_latency()));
|
||||
trace!("winner: {}", faster_rpc);
|
||||
|
||||
// add to the skip list in case this one fails
|
||||
skip.push(Arc::clone(faster_rpc));
|
||||
|
||||
// just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits
|
||||
// TODO: what error_handler?
|
||||
match faster_rpc
|
||||
.try_request_handle(web3_request, error_handler)
|
||||
.await
|
||||
{
|
||||
Ok(OpenRequestResult::Handle(handle)) => {
|
||||
trace!("opened handle: {}", faster_rpc);
|
||||
return OpenRequestResult::Handle(handle);
|
||||
}
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
trace!(
|
||||
"retry on {} @ {}",
|
||||
faster_rpc,
|
||||
retry_at.duration_since(Instant::now()).as_secs_f32()
|
||||
);
|
||||
|
||||
if earliest_retry_at.is_none() {
|
||||
earliest_retry_at = Some(retry_at);
|
||||
} else {
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
}
|
||||
}
|
||||
Ok(OpenRequestResult::NotReady) => {
|
||||
// TODO: log a warning? emit a stat?
|
||||
trace!("best_rpc not ready: {}", faster_rpc);
|
||||
}
|
||||
Err(err) => {
|
||||
trace!("No request handle for {}. err={:?}", faster_rpc, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(retry_at) = earliest_retry_at {
|
||||
OpenRequestResult::RetryAt(retry_at)
|
||||
} else {
|
||||
OpenRequestResult::NotReady
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: skip_rpcs should probably be on the web3_request, too
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn wait_for_best_rpc(
|
||||
&self,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
skip_rpcs: &mut Vec<Arc<Web3Rpc>>,
|
||||
error_handler: Option<RequestErrorHandler>,
|
||||
) -> Web3ProxyResult<OpenRequestResult> {
|
||||
let mut earliest_retry_at: Option<Instant> = None;
|
||||
|
||||
if self.watch_head_block.is_none() {
|
||||
// if this group of servers is not watching the head block, we don't know what is "best" based on block height
|
||||
// TODO: do this without cloning here
|
||||
let potential_rpcs = self.by_name.read().values().cloned().collect::<Vec<_>>();
|
||||
|
||||
let x = self
|
||||
._best_available_rpc(web3_request, error_handler, &potential_rpcs, skip_rpcs)
|
||||
.await;
|
||||
|
||||
return Ok(x);
|
||||
}
|
||||
|
||||
let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe();
|
||||
|
||||
let mut potential_rpcs = Vec::new();
|
||||
|
||||
let min_block_needed = web3_request.min_block_needed();
|
||||
let max_block_needed = web3_request.max_block_needed();
|
||||
|
||||
// TODO: max loop count if no max_wait?
|
||||
loop {
|
||||
// TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start
|
||||
let ranked_rpcs: Option<Arc<RankedRpcs>> =
|
||||
watch_ranked_rpcs.borrow_and_update().clone();
|
||||
|
||||
// first check everything that is synced
|
||||
// even though we might be querying an old block that an unsynced server can handle,
|
||||
// it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage.
|
||||
if let Some(ranked_rpcs) = ranked_rpcs {
|
||||
potential_rpcs.extend(
|
||||
ranked_rpcs
|
||||
.all()
|
||||
.iter()
|
||||
.filter(|rpc| {
|
||||
// TODO: instrument this?
|
||||
ranked_rpcs.rpc_will_work_now(
|
||||
skip_rpcs,
|
||||
min_block_needed,
|
||||
max_block_needed,
|
||||
rpc,
|
||||
)
|
||||
})
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
if potential_rpcs.len() >= self.min_synced_rpcs {
|
||||
// we have enough potential rpcs. try to load balance
|
||||
potential_rpcs
|
||||
.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed));
|
||||
|
||||
match self
|
||||
._best_available_rpc(
|
||||
web3_request,
|
||||
error_handler,
|
||||
&potential_rpcs,
|
||||
skip_rpcs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)),
|
||||
OpenRequestResult::NotReady => {}
|
||||
OpenRequestResult::RetryAt(retry_at) => {
|
||||
if earliest_retry_at.is_none() {
|
||||
earliest_retry_at = Some(retry_at);
|
||||
} else {
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match ranked_rpcs.should_wait_for_block(
|
||||
min_block_needed,
|
||||
max_block_needed,
|
||||
skip_rpcs,
|
||||
) {
|
||||
ShouldWaitForBlock::NeverReady => break,
|
||||
ShouldWaitForBlock::Ready => {
|
||||
if web3_request.ttl_expired() {
|
||||
break;
|
||||
}
|
||||
// TODO: i don't see how we can get here. something feels wrong if this is common.
|
||||
// maybe from a race? maybe _best_available_rpc returned NotReady just as a node synced
|
||||
yield_now().await;
|
||||
}
|
||||
ShouldWaitForBlock::Wait { .. } => select! {
|
||||
_ = watch_ranked_rpcs.changed() => {
|
||||
// no need to borrow_and_update because we do that at the top of the loop
|
||||
// TODO: wait until watched_ranked_rpcs is on the right block?
|
||||
trace!("watch ranked rpcs changed");
|
||||
},
|
||||
_ = sleep(web3_request.ttl()) => break,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
trace!("no potential rpcs");
|
||||
select! {
|
||||
_ = watch_ranked_rpcs.changed() => {
|
||||
// no need to borrow_and_update because we do that at the top of the loop
|
||||
trace!("watch ranked rpcs changed");
|
||||
},
|
||||
_ = sleep(web3_request.ttl()) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// clear for the next loop
|
||||
potential_rpcs.clear();
|
||||
}
|
||||
|
||||
web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if let Some(retry_at) = earliest_retry_at {
|
||||
// TODO: log the server that retry_at came from
|
||||
warn!(
|
||||
?skip_rpcs,
|
||||
retry_in_s=?retry_at.duration_since(Instant::now()).as_secs_f32(),
|
||||
"no servers in {} ready!",
|
||||
self,
|
||||
);
|
||||
|
||||
Ok(OpenRequestResult::RetryAt(retry_at))
|
||||
} else {
|
||||
warn!(?skip_rpcs, "no servers in {} ready!", self);
|
||||
|
||||
Ok(OpenRequestResult::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
/// get all rpc servers that are not rate limited
|
||||
/// this prefers synced servers, but it will return servers even if they aren't fully in sync.
|
||||
/// This is useful for broadcasting signed transactions.
|
||||
/// TODO: better type on this that can return an anyhow::Result
|
||||
/// TODO: redo this to just return all the connections. better to do rate limits at the end
|
||||
pub async fn all_connections(
|
||||
&self,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
max_count: Option<usize>,
|
||||
error_level: Option<RequestErrorHandler>,
|
||||
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
|
||||
let mut earliest_retry_at = None;
|
||||
/// this does not gaurentee you won't be rate limited. we don't increment our counters until you try to send. so you might have to wait to be able to send
|
||||
/// TODO: should this wait for ranked rpcs? maybe only a fraction of web3_request's time?
|
||||
pub async fn try_rpcs_for_request(&self, web3_request: Arc<Web3Request>) -> TryRpcsForRequest {
|
||||
// TODO: by_name might include things that are on a forked
|
||||
let ranked_rpcs: Arc<RankedRpcs> =
|
||||
if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() {
|
||||
ranked_rpcs
|
||||
} else if let Some(head_block) = web3_request.head_block.clone() {
|
||||
// if we are here, self isn't watching head blocks but some other Web3Rpcs is. Return all the rpcs
|
||||
let rpcs = self.by_name.read().values().cloned().collect();
|
||||
|
||||
// TODO: filter the rpcs with Ranked.will_work_now
|
||||
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
|
||||
|
||||
let mut max_count = if let Some(max_count) = max_count {
|
||||
max_count
|
||||
} else {
|
||||
all_rpcs.len()
|
||||
};
|
||||
|
||||
trace!("max_count: {}", max_count);
|
||||
|
||||
if max_count == 0 {
|
||||
// TODO: return a future that resolves when we know a head block?
|
||||
return Err(None);
|
||||
}
|
||||
|
||||
let mut selected_rpcs = Vec::with_capacity(max_count);
|
||||
|
||||
// TODO: this sorts them all even though we probably won't need all of them. think about this more
|
||||
all_rpcs
|
||||
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(web3_request.max_block_needed()));
|
||||
|
||||
trace!("unfiltered all_rpcs: {:#?}", all_rpcs);
|
||||
|
||||
for rpc in all_rpcs {
|
||||
trace!("trying {}", rpc);
|
||||
|
||||
// TODO: use a helper function for these
|
||||
if let Some(block_needed) = web3_request.min_block_needed() {
|
||||
trace!("min_block_needed: {}", block_needed);
|
||||
if !rpc.has_block_data(block_needed) {
|
||||
trace!("{} is missing min_block_needed. skipping", rpc);
|
||||
continue;
|
||||
if let Some(x) = RankedRpcs::from_rpcs(rpcs, head_block) {
|
||||
Arc::new(x)
|
||||
} else {
|
||||
// i doubt we will ever get here
|
||||
// TODO: return a future that resolves once we do have something?
|
||||
return TryRpcsForRequest::None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: return a future that resolves once we do have something?
|
||||
return TryRpcsForRequest::None;
|
||||
};
|
||||
|
||||
if let Some(block_needed) = web3_request.max_block_needed() {
|
||||
trace!("max_block_needed: {}", block_needed);
|
||||
if !rpc.has_block_data(block_needed) {
|
||||
trace!("{} is missing max_block_needed. skipping", rpc);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// check rate limits and increment our connection counter
|
||||
match rpc.try_request_handle(web3_request, error_level).await {
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
// this rpc is not available. skip it
|
||||
trace!("{} is rate limited. skipping", rpc);
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
}
|
||||
Ok(OpenRequestResult::Handle(handle)) => {
|
||||
trace!("{} is available", rpc);
|
||||
selected_rpcs.push(handle);
|
||||
|
||||
max_count -= 1;
|
||||
if max_count == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(OpenRequestResult::NotReady) => {
|
||||
warn!("no request handle for {}", rpc)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(?err, "error getting request handle for {}", rpc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !selected_rpcs.is_empty() {
|
||||
return Ok(selected_rpcs);
|
||||
}
|
||||
|
||||
// return the earliest retry_after (if no rpcs are synced, this will be None)
|
||||
Err(earliest_retry_at)
|
||||
ranked_rpcs.for_request(web3_request).into()
|
||||
}
|
||||
|
||||
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
|
||||
@ -742,12 +432,13 @@ impl Web3Rpcs {
|
||||
) -> Web3ProxyResult<R> {
|
||||
let head_block = self.head_block();
|
||||
|
||||
// TODO: i think we actually always want balanced_rpcs on this!
|
||||
let web3_request =
|
||||
Web3Request::new_internal(method.into(), params, head_block, max_wait).await;
|
||||
|
||||
let response = self.request_with_metadata(&web3_request).await?;
|
||||
let response = self.request_with_metadata(web3_request).await?;
|
||||
|
||||
let parsed = response.parsed().await?;
|
||||
|
||||
match parsed.payload {
|
||||
jsonrpc::Payload::Success { result } => Ok(result),
|
||||
// TODO: confirm this error type is correct
|
||||
@ -756,33 +447,42 @@ impl Web3Rpcs {
|
||||
}
|
||||
|
||||
/// Make a request with stat tracking.
|
||||
/// The first jsonrpc response will be returned.
|
||||
/// TODO: move this to RankedRpcsForRequest along with a bunch of other similar functions
|
||||
/// TODO: take an arg for max_tries. take an arg for quorum(size) or serial
|
||||
pub async fn request_with_metadata<R: JsonRpcResultData>(
|
||||
&self,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
web3_request: Arc<Web3Request>,
|
||||
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
|
||||
let mut skip_rpcs = vec![];
|
||||
let mut method_not_available_response = None;
|
||||
|
||||
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
|
||||
let watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
|
||||
|
||||
// set error_handler to Save. this might be overridden depending on the web3_request.authorization
|
||||
// TODO: rename this to make it clear it might be overriden
|
||||
let error_handler = Some(RequestErrorHandler::Save);
|
||||
|
||||
// TODO: collect the most common error
|
||||
let mut last_jsonrpc_error = None;
|
||||
let mut last_provider_error = None;
|
||||
|
||||
// TODO: the loop here feels somewhat redundant with the loop in best_available_rpc
|
||||
loop {
|
||||
if web3_request.ttl_expired() {
|
||||
break;
|
||||
// TODO: limit number of tries
|
||||
match self.try_rpcs_for_request(web3_request.clone()).await {
|
||||
TryRpcsForRequest::None => return Err(Web3ProxyError::NoServersSynced),
|
||||
TryRpcsForRequest::RetryAt(retry_at) => {
|
||||
if retry_at > web3_request.expire_instant {
|
||||
return Err(Web3ProxyError::RateLimited(
|
||||
Default::default(),
|
||||
Some(retry_at),
|
||||
));
|
||||
}
|
||||
}
|
||||
TryRpcsForRequest::Some(rpcs) => {
|
||||
let stream = rpcs.to_stream();
|
||||
|
||||
match self
|
||||
.wait_for_best_rpc(web3_request, &mut skip_rpcs, error_handler)
|
||||
.await?
|
||||
{
|
||||
OpenRequestResult::Handle(active_request_handle) => {
|
||||
// save the rpc in case we get an error and want to retry on another server
|
||||
// TODO: look at backend_requests instead
|
||||
pin!(stream);
|
||||
|
||||
while let Some(active_request_handle) = stream.next().await {
|
||||
let rpc = active_request_handle.clone_connection();
|
||||
|
||||
web3_request.backend_requests.lock().push(rpc.clone());
|
||||
@ -856,7 +556,7 @@ impl Web3Rpcs {
|
||||
error!(
|
||||
%error_msg,
|
||||
"unexpected result limit by {}",
|
||||
skip_rpcs.last().unwrap(),
|
||||
"???"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@ -864,7 +564,7 @@ impl Web3Rpcs {
|
||||
warn!(
|
||||
%error_msg,
|
||||
"rate limited by {}",
|
||||
skip_rpcs.last().unwrap()
|
||||
"???"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@ -914,62 +614,21 @@ impl Web3Rpcs {
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// let rpc = skip_rpcs
|
||||
// .last()
|
||||
// .expect("there must have been a provider if we got an error");
|
||||
|
||||
// TODO: emit a stat. if a server is getting skipped a lot, something is not right
|
||||
|
||||
// TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest
|
||||
|
||||
// TODO! WRONG! ONLY SET RETRY_AT IF THIS IS A SERVER/CONNECTION ERROR. JSONRPC "error" is FINE
|
||||
// trace!(
|
||||
// "Backend server error on {}! Retrying {:?} on another. err={:?}",
|
||||
// rpc,
|
||||
// request,
|
||||
// error,
|
||||
// );
|
||||
// if let Some(ref hard_limit_until) = rpc.hard_limit_until {
|
||||
// let retry_at = Instant::now() + Duration::from_secs(1);
|
||||
|
||||
// hard_limit_until.send_replace(retry_at);
|
||||
// }
|
||||
|
||||
return Err(error.into());
|
||||
last_jsonrpc_error = Some(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
OpenRequestResult::RetryAt(retry_at) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||
warn!(
|
||||
"All rate limits exceeded. waiting for change in synced servers or {:?}s",
|
||||
retry_at.duration_since(Instant::now()).as_secs_f32()
|
||||
);
|
||||
|
||||
// TODO: have a separate column for rate limited?
|
||||
web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
select! {
|
||||
_ = sleep_until(retry_at) => {
|
||||
trace!("slept!");
|
||||
skip_rpcs.pop();
|
||||
}
|
||||
_ = watch_consensus_rpcs.changed() => {
|
||||
// we don't actually care what they are now. we just care that they changed
|
||||
watch_consensus_rpcs.borrow_and_update();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
OpenRequestResult::NotReady => {
|
||||
web3_request.error_response.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(err) = last_jsonrpc_error {
|
||||
return Ok(err.into());
|
||||
}
|
||||
|
||||
if let Some(err) = last_provider_error {
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
if let Some(err) = method_not_available_response {
|
||||
web3_request.error_response.store(false, Ordering::Relaxed);
|
||||
|
||||
@ -982,61 +641,57 @@ impl Web3Rpcs {
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
if let Some(err) = last_provider_error {
|
||||
return Err(err.into());
|
||||
}
|
||||
// let min_block_needed = web3_request.min_block_needed();
|
||||
// let max_block_needed = web3_request.max_block_needed();
|
||||
|
||||
let min_block_needed = web3_request.min_block_needed();
|
||||
let max_block_needed = web3_request.max_block_needed();
|
||||
// let num_conns = self.len();
|
||||
// let num_skipped = skip_rpcs.len();
|
||||
|
||||
let num_conns = self.len();
|
||||
let num_skipped = skip_rpcs.len();
|
||||
// let head_block_num = watch_consensus_rpcs
|
||||
// .borrow_and_update()
|
||||
// .as_ref()
|
||||
// .map(|x| x.head_block.number());
|
||||
|
||||
let head_block_num = watch_consensus_rpcs
|
||||
.borrow_and_update()
|
||||
.as_ref()
|
||||
.map(|x| x.head_block.number());
|
||||
// // TODO: error? warn? debug? trace?
|
||||
// if head_block_num.is_none() {
|
||||
// error!(
|
||||
// min=?min_block_needed,
|
||||
// max=?max_block_needed,
|
||||
// head=?head_block_num,
|
||||
// known=num_conns,
|
||||
// method=%web3_request.request.method(),
|
||||
// params=?web3_request.request.params(),
|
||||
// "No servers synced",
|
||||
// );
|
||||
// } else if head_block_num > max_block_needed {
|
||||
// // we have synced past the needed block
|
||||
// // TODO: log ranked rpcs
|
||||
// // TODO: only log params in development
|
||||
// error!(
|
||||
// min=?min_block_needed,
|
||||
// max=?max_block_needed,
|
||||
// head=?head_block_num,
|
||||
// known=%num_conns,
|
||||
// method=%web3_request.request.method(),
|
||||
// params=?web3_request.request.params(),
|
||||
// "No archive servers synced",
|
||||
// );
|
||||
// } else {
|
||||
// // TODO: only log params in development
|
||||
// // TODO: log ranked rpcs
|
||||
// error!(
|
||||
// min=?min_block_needed,
|
||||
// max=?max_block_needed,
|
||||
// head=?head_block_num,
|
||||
// skipped=%num_skipped,
|
||||
// known=%num_conns,
|
||||
// method=%web3_request.request.method(),
|
||||
// params=?web3_request.request.params(),
|
||||
// "Requested data is not available",
|
||||
// );
|
||||
// }
|
||||
|
||||
// TODO: error? warn? debug? trace?
|
||||
if head_block_num.is_none() {
|
||||
error!(
|
||||
min=?min_block_needed,
|
||||
max=?max_block_needed,
|
||||
head=?head_block_num,
|
||||
known=num_conns,
|
||||
method=%web3_request.request.method(),
|
||||
params=?web3_request.request.params(),
|
||||
"No servers synced",
|
||||
);
|
||||
} else if head_block_num > max_block_needed {
|
||||
// we have synced past the needed block
|
||||
// TODO: log ranked rpcs
|
||||
// TODO: only log params in development
|
||||
error!(
|
||||
min=?min_block_needed,
|
||||
max=?max_block_needed,
|
||||
head=?head_block_num,
|
||||
known=%num_conns,
|
||||
method=%web3_request.request.method(),
|
||||
params=?web3_request.request.params(),
|
||||
"No archive servers synced",
|
||||
);
|
||||
} else {
|
||||
// TODO: only log params in development
|
||||
// TODO: log ranked rpcs
|
||||
error!(
|
||||
min=?min_block_needed,
|
||||
max=?max_block_needed,
|
||||
head=?head_block_num,
|
||||
skipped=%num_skipped,
|
||||
known=%num_conns,
|
||||
method=%web3_request.request.method(),
|
||||
params=?web3_request.request.params(),
|
||||
"Requested data is not available",
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: what error code?
|
||||
// TODO: what error code? what data?
|
||||
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
|
||||
Err(JsonRpcErrorData {
|
||||
message: "Requested data is not available".into(),
|
||||
@ -1046,149 +701,150 @@ impl Web3Rpcs {
|
||||
.into())
|
||||
}
|
||||
|
||||
/// be sure there is a timeout on this or it might loop forever
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn try_send_all_synced_connections<R: JsonRpcResultData>(
|
||||
self: &Arc<Self>,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
max_wait: Option<Duration>,
|
||||
error_level: Option<RequestErrorHandler>,
|
||||
max_sends: Option<usize>,
|
||||
) -> Web3ProxyResult<R> {
|
||||
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
|
||||
// /// be sure there is a timeout on this or it might loop forever
|
||||
// #[allow(clippy::too_many_arguments)]
|
||||
// pub async fn xxx_try_send_all_synced_connections<R: JsonRpcResultData>(
|
||||
// self: &Arc<Self>,
|
||||
// web3_request: &Arc<Web3Request>,
|
||||
// max_wait: Option<Duration>,
|
||||
// error_level: Option<RequestErrorHandler>,
|
||||
// max_sends: Option<usize>,
|
||||
// ) -> Web3ProxyResult<R> {
|
||||
// let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
|
||||
|
||||
// todo!() we are inconsistent with max_wait and web3_request.expires_at
|
||||
let start = Instant::now();
|
||||
// // todo!() we are inconsistent with max_wait and web3_request.expires_at
|
||||
// let start = Instant::now();
|
||||
|
||||
loop {
|
||||
if let Some(max_wait) = max_wait {
|
||||
if start.elapsed() > max_wait {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// loop {
|
||||
// if let Some(max_wait) = max_wait {
|
||||
// if start.elapsed() > max_wait {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
match self
|
||||
.all_connections(web3_request, max_sends, error_level)
|
||||
.await
|
||||
{
|
||||
Ok(active_request_handles) => {
|
||||
let mut only_backups_used = true;
|
||||
// match self
|
||||
// .all_connections(web3_request, max_sends, error_level)
|
||||
// .await
|
||||
// {
|
||||
// Ok(active_request_handles) => {
|
||||
// let mut only_backups_used = true;
|
||||
|
||||
web3_request
|
||||
.backend_requests
|
||||
.lock()
|
||||
.extend(active_request_handles.iter().map(|x| {
|
||||
let rpc = x.clone_connection();
|
||||
// web3_request
|
||||
// .backend_requests
|
||||
// .lock()
|
||||
// .extend(active_request_handles.iter().map(|x| {
|
||||
// let rpc = x.clone_connection();
|
||||
|
||||
if !rpc.backup {
|
||||
// TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more
|
||||
only_backups_used = false;
|
||||
}
|
||||
// if !rpc.backup {
|
||||
// // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more
|
||||
// only_backups_used = false;
|
||||
// }
|
||||
|
||||
rpc
|
||||
}));
|
||||
// rpc
|
||||
// }));
|
||||
|
||||
warn!("move this to where we turn RequestMetadata into a Stat");
|
||||
web3_request
|
||||
.response_from_backup_rpc
|
||||
.store(only_backups_used, Ordering::Relaxed);
|
||||
// warn!("move this to where we turn RequestMetadata into a Stat");
|
||||
// web3_request
|
||||
// .response_from_backup_rpc
|
||||
// .store(only_backups_used, Ordering::Relaxed);
|
||||
|
||||
let x = self
|
||||
.try_send_parallel_requests(active_request_handles, max_wait)
|
||||
.await?;
|
||||
// let x = self
|
||||
// .try_send_parallel_requests(active_request_handles, max_wait)
|
||||
// .await?;
|
||||
|
||||
// TODO: count the number of successes and possibly retry if there weren't enough
|
||||
// // TODO: count the number of successes and possibly retry if there weren't enough
|
||||
|
||||
return Ok(x);
|
||||
}
|
||||
Err(None) => {
|
||||
warn!(
|
||||
?self,
|
||||
min_block_needed=?web3_request.min_block_needed(),
|
||||
max_block_needed=?web3_request.max_block_needed(),
|
||||
"No servers in sync on! Retrying",
|
||||
);
|
||||
// return Ok(x);
|
||||
// }
|
||||
// Err(None) => {
|
||||
// warn!(
|
||||
// ?self,
|
||||
// min_block_needed=?web3_request.min_block_needed(),
|
||||
// max_block_needed=?web3_request.max_block_needed(),
|
||||
// "No servers in sync on! Retrying",
|
||||
// );
|
||||
|
||||
// TODO: if this times out, i think we drop this
|
||||
web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
// // TODO: if this times out, i think we drop this
|
||||
// web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let max_sleep = if let Some(max_wait) = max_wait {
|
||||
start + max_wait
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
// let max_sleep = if let Some(max_wait) = max_wait {
|
||||
// start + max_wait
|
||||
// } else {
|
||||
// break;
|
||||
// };
|
||||
|
||||
select! {
|
||||
_ = sleep_until(max_sleep) => {
|
||||
// rpcs didn't change and we have waited too long. break to return an error
|
||||
warn!(?self, "timeout waiting for try_send_all_synced_connections!");
|
||||
break;
|
||||
},
|
||||
_ = watch_consensus_rpcs.changed() => {
|
||||
// consensus rpcs changed!
|
||||
watch_consensus_rpcs.borrow_and_update();
|
||||
// select! {
|
||||
// _ = sleep_until(max_sleep) => {
|
||||
// // rpcs didn't change and we have waited too long. break to return an error
|
||||
// warn!(?self, "timeout waiting for try_send_all_synced_connections!");
|
||||
// break;
|
||||
// },
|
||||
// _ = watch_consensus_rpcs.changed() => {
|
||||
// // consensus rpcs changed!
|
||||
// watch_consensus_rpcs.borrow_and_update();
|
||||
|
||||
// continue to try again
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(Some(retry_at)) => {
|
||||
web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
// // continue to try again
|
||||
// continue;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(Some(retry_at)) => {
|
||||
// web3_request.no_servers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if let Some(max_wait) = max_wait {
|
||||
if start.elapsed() > max_wait {
|
||||
warn!(
|
||||
?self,
|
||||
"All rate limits exceeded. And sleeping would take too long"
|
||||
);
|
||||
break;
|
||||
}
|
||||
// if let Some(max_wait) = max_wait {
|
||||
// if start.elapsed() > max_wait {
|
||||
// warn!(
|
||||
// ?self,
|
||||
// "All rate limits exceeded. And sleeping would take too long"
|
||||
// );
|
||||
// break;
|
||||
// }
|
||||
|
||||
warn!("All rate limits exceeded. Sleeping");
|
||||
// warn!("All rate limits exceeded. Sleeping");
|
||||
|
||||
// TODO: only make one of these sleep_untils
|
||||
// // TODO: only make one of these sleep_untils
|
||||
|
||||
let break_at = start + max_wait;
|
||||
// let break_at = start + max_wait;
|
||||
|
||||
if break_at <= retry_at {
|
||||
select! {
|
||||
_ = sleep_until(break_at) => {break}
|
||||
_ = watch_consensus_rpcs.changed() => {
|
||||
watch_consensus_rpcs.borrow_and_update();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
select! {
|
||||
_ = sleep_until(retry_at) => {}
|
||||
_ = watch_consensus_rpcs.changed() => {
|
||||
watch_consensus_rpcs.borrow_and_update();
|
||||
}
|
||||
}
|
||||
}
|
||||
// if break_at <= retry_at {
|
||||
// select! {
|
||||
// _ = sleep_until(break_at) => {break}
|
||||
// _ = watch_consensus_rpcs.changed() => {
|
||||
// watch_consensus_rpcs.borrow_and_update();
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// select! {
|
||||
// _ = sleep_until(retry_at) => {}
|
||||
// _ = watch_consensus_rpcs.changed() => {
|
||||
// watch_consensus_rpcs.borrow_and_update();
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
continue;
|
||||
} else {
|
||||
warn!(?self, "all rate limits exceeded");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// continue;
|
||||
// } else {
|
||||
// warn!(?self, "all rate limits exceeded");
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
Err(Web3ProxyError::NoServersSynced)
|
||||
}
|
||||
// Err(Web3ProxyError::NoServersSynced)
|
||||
// }
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn try_proxy_connection<R: JsonRpcResultData>(
|
||||
&self,
|
||||
web3_request: &Arc<Web3Request>,
|
||||
web3_request: Arc<Web3Request>,
|
||||
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
|
||||
let proxy_mode = web3_request.proxy_mode();
|
||||
|
||||
match proxy_mode {
|
||||
ProxyMode::Debug | ProxyMode::Best => self.request_with_metadata(web3_request).await,
|
||||
ProxyMode::Fastest(_x) => todo!("Fastest"),
|
||||
ProxyMode::Quorum(_x, _y) => todo!("Fastest"),
|
||||
ProxyMode::Versus => todo!("Versus"),
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
|
||||
use migration::sea_orm::DatabaseConnection;
|
||||
use nanorand::tls::TlsWyRand;
|
||||
use nanorand::Rng;
|
||||
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
@ -31,6 +32,7 @@ use tracing::{debug, error, info, trace, warn, Level};
|
||||
use url::Url;
|
||||
|
||||
/// An active connection to a Web3 RPC server like geth or erigon.
|
||||
/// TODO: smarter Default derive or move the channels around so they aren't part of this at all
|
||||
#[derive(Default)]
|
||||
pub struct Web3Rpc {
|
||||
pub name: String,
|
||||
@ -229,7 +231,14 @@ impl Web3Rpc {
|
||||
Ok((new_connection, handle))
|
||||
}
|
||||
|
||||
pub fn next_available(&self) -> Instant {
|
||||
let hard_limit_until = *self.hard_limit_until.as_ref().unwrap().borrow();
|
||||
|
||||
hard_limit_until.max(Instant::now())
|
||||
}
|
||||
|
||||
/// sort by...
|
||||
/// - rate limit (ascending)
|
||||
/// - backups last
|
||||
/// - tier (ascending)
|
||||
/// - block number (descending)
|
||||
@ -237,7 +246,7 @@ impl Web3Rpc {
|
||||
/// TODO: should tier or block number take priority?
|
||||
/// TODO: should this return a struct that implements sorting traits?
|
||||
/// TODO: move this to consensus.rs
|
||||
fn sort_on(&self, max_block: Option<U64>) -> (bool, Reverse<U64>, u32) {
|
||||
fn sort_on(&self, max_block: Option<U64>) -> (Reverse<Instant>, bool, Reverse<U64>, u32) {
|
||||
let mut head_block = self
|
||||
.head_block_sender
|
||||
.as_ref()
|
||||
@ -252,14 +261,22 @@ impl Web3Rpc {
|
||||
|
||||
let backup = self.backup;
|
||||
|
||||
(!backup, Reverse(head_block), tier)
|
||||
let rate_limit_until =
|
||||
(*self.hard_limit_until.as_ref().unwrap().borrow()).max(Instant::now());
|
||||
|
||||
(
|
||||
Reverse(rate_limit_until),
|
||||
!backup,
|
||||
Reverse(head_block),
|
||||
tier,
|
||||
)
|
||||
}
|
||||
|
||||
/// TODO: move this to consensus.rs
|
||||
pub fn sort_for_load_balancing_on(
|
||||
&self,
|
||||
max_block: Option<U64>,
|
||||
) -> ((bool, Reverse<U64>, u32), Duration) {
|
||||
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), Duration) {
|
||||
let sort_on = self.sort_on(max_block);
|
||||
|
||||
let weighted_peak_latency = self.weighted_peak_latency();
|
||||
@ -273,14 +290,14 @@ impl Web3Rpc {
|
||||
|
||||
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency
|
||||
/// TODO: move this to consensus.rs
|
||||
/// TODO: this return type is too complex
|
||||
pub fn shuffle_for_load_balancing_on(
|
||||
&self,
|
||||
rng: &mut TlsWyRand,
|
||||
max_block: Option<U64>,
|
||||
) -> ((bool, Reverse<U64>, u32), u8) {
|
||||
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), u8) {
|
||||
let sort_on = self.sort_on(max_block);
|
||||
|
||||
let mut rng = nanorand::tls_rng();
|
||||
|
||||
let r = rng.generate::<u8>();
|
||||
|
||||
(sort_on, r)
|
||||
|
@ -173,6 +173,7 @@ impl OpenRequestHandle {
|
||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||
/// depending on how things are locked, you might need to pass the provider in
|
||||
/// we take self to ensure this function only runs once
|
||||
/// TODO: abandon ProviderError
|
||||
pub async fn request<R: JsonRpcResultData + serde::Serialize>(
|
||||
self,
|
||||
) -> Result<jsonrpc::SingleResponse<R>, ProviderError> {
|
||||
@ -212,12 +213,15 @@ impl OpenRequestHandle {
|
||||
(&self.rpc.http_url, &self.rpc.http_client)
|
||||
{
|
||||
let params: serde_json::Value = serde_json::to_value(params)?;
|
||||
|
||||
// TODO: why recreate this? we should be able to just use the one from the user
|
||||
let request = jsonrpc::JsonRpcRequest::new(
|
||||
self.web3_request.id().into(),
|
||||
method.to_string(),
|
||||
params,
|
||||
)
|
||||
.expect("request creation cannot fail");
|
||||
|
||||
match client.post(url.clone()).json(&request).send().await {
|
||||
// TODO: threshold from configs
|
||||
Ok(response) => {
|
||||
|
Loading…
Reference in New Issue
Block a user