split http and ws provider

This commit is contained in:
Bryan Stitt 2023-05-23 14:40:34 -07:00
parent 05f862f3e4
commit 78f247fc6c
18 changed files with 444 additions and 661 deletions

1
Cargo.lock generated

@ -6533,6 +6533,7 @@ dependencies = [
"hex_fmt",
"hostname",
"http",
"hyper",
"influxdb2",
"influxdb2-structmap",
"ipnet",

@ -55,6 +55,7 @@ hdrhistogram = "7.5.2"
hex_fmt = "0.3.0"
hostname = "0.3.1"
http = "0.2.9"
hyper = { version = "0.14.26", features = ["full"] }
influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"] }
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
ipnet = "2.7.2"

@ -74,7 +74,7 @@ pub static APP_USER_AGENT: &str = concat!(
// aggregate across 1 week
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>;
/// TODO: move this
#[derive(Clone, Debug, Default, From)]
@ -176,7 +176,7 @@ pub struct Web3ProxyApp {
// TODO: should the key be our RpcSecretKey class instead of Ulid?
pub rpc_secret_key_cache: RpcSecretKeyCache,
/// concurrent/parallel RPC request limits for authenticated users
pub rpc_key_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
pub user_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
/// concurrent/parallel application request limits for authenticated users
@ -188,7 +188,7 @@ pub struct Web3ProxyApp {
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
pub async fn flatten_handle<T>(handle: Web3ProxyJoinHandle<T>) -> Web3ProxyResult<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
@ -198,8 +198,8 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
/// return the first error, or Ok if everything worked
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> {
mut handles: FuturesUnordered<Web3ProxyJoinHandle<T>>,
) -> Web3ProxyResult<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
@ -315,9 +315,9 @@ pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
/// handles for the balanced and private rpcs
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
pub app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>,
/// watch this to know when to start the app
@ -359,10 +359,12 @@ impl Web3ProxyApp {
}
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
let app_handles = FuturesUnordered::new();
// TODO: this is a small enough group, that a vec with try_join_all is probably fine
let app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
let important_background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> =
FuturesUnordered::new();
// connect to the database and make sure the latest migrations have run
let mut db_conn = None::<DatabaseConnection>;
@ -624,12 +626,10 @@ impl Web3ProxyApp {
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache::new(max_users);
let ip_semaphores = Cache::new(max_users);
let registered_user_semaphores = Cache::new(max_users);
let user_semaphores = Cache::new(max_users);
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
@ -654,9 +654,7 @@ impl Web3ProxyApp {
// TODO: Merge
// let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn(
let (private_rpcs, private_handle, _) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
@ -688,9 +686,7 @@ impl Web3ProxyApp {
} else {
// TODO: do something with the spawn handle
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
@ -735,7 +731,7 @@ impl Web3ProxyApp {
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
rpc_key_semaphores: registered_user_semaphores,
user_semaphores,
stat_sender,
};
@ -752,9 +748,9 @@ impl Web3ProxyApp {
loop {
let new_top_config = new_top_config_receiver.borrow_and_update().to_owned();
app.apply_top_config(new_top_config)
.await
.context("failed applying new top_config")?;
if let Err(err) = app.apply_top_config(new_top_config).await {
error!("unable to apply config! {:?}", err);
};
new_top_config_receiver
.changed()
@ -790,7 +786,7 @@ impl Web3ProxyApp {
.into())
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> {
// TODO: also update self.config from new_top_config.app
// connect to the backends

@ -1,4 +1,4 @@
use crate::app::AnyhowJoinHandle;
use crate::app::Web3ProxyJoinHandle;
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use argh::FromArgs;
@ -9,7 +9,7 @@ use log::warn;
use migration::sea_orm::DatabaseConnection;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use std::time::Duration;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
@ -283,23 +283,45 @@ impl Web3RpcConfig {
redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
}
// TODO: get this from config? a helper function? where does this belong?
let block_interval = match chain_id {
// ethereum
1 => Duration::from_secs(12),
// ethereum-goerli
5 => Duration::from_secs(12),
// polygon
137 => Duration::from_secs(2),
// fantom
250 => Duration::from_secs(1),
// arbitrum
42161 => Duration::from_millis(500),
// anything else
_ => {
let default = 10;
warn!(
"unexpected chain_id ({}). polling every {} seconds",
chain_id, default
);
Duration::from_secs(default)
}
};
Web3Rpc::spawn(
self,
name,
chain_id,
db_conn,
http_client,
http_interval_sender,
redis_pool,
block_interval,
blocks_by_hash_cache,
block_sender,
tx_id_sender,

@ -30,7 +30,7 @@ use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::convert::Infallible;
use std::fmt::Display;
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
@ -42,22 +42,12 @@ use ulid::Ulid;
use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs
/// TODO: include the key's description.
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub enum RpcSecretKey {
Ulid(Ulid),
Uuid(Uuid),
}
impl Hash for RpcSecretKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Ulid(x) => state.write_u128(x.0),
Self::Uuid(x) => state.write_u128(x.as_u128()),
}
}
}
/// TODO: should this have IpAddr and Origin or AuthorizationChecks?
#[derive(Debug)]
pub enum RateLimitResult {
@ -99,6 +89,17 @@ pub struct KafkaDebugLogger {
num_responses: AtomicUsize,
}
impl Hash for RpcSecretKey {
fn hash<H: Hasher>(&self, state: &mut H) {
let x = match self {
Self::Ulid(x) => x.0,
Self::Uuid(x) => x.as_u128(),
};
x.hash(state);
}
}
impl fmt::Debug for KafkaDebugLogger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaDebugLogger")
@ -883,17 +884,13 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
.get_or_insert_async::<Web3ProxyError>(ip, async move {
.get_or_insert_async::<Infallible>(ip, async move {
// TODO: set max_concurrent_requests dynamically based on load?
let s = Semaphore::new(max_concurrent_requests);
Ok(Arc::new(s))
})
.await?;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? less important for anon users
// // TODO: there is probably a race here
// }
.await
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
@ -903,8 +900,8 @@ impl Web3ProxyApp {
}
}
/// Limit the number of concurrent requests from the given rpc key.
pub async fn registered_user_semaphore(
/// Limit the number of concurrent requests for a given user across all of their keys
pub async fn user_semaphore(
&self,
authorization_checks: &AuthorizationChecks,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
@ -915,25 +912,19 @@ impl Web3ProxyApp {
.or(Err(Web3ProxyError::UserIdZero))?;
let semaphore = self
.rpc_key_semaphores
.get_or_insert_async(&user_id, async move {
.user_semaphores
.get_or_insert_async::<Infallible>(&user_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
// trace!("new semaphore for user_id {}", user_id);
Ok::<_, Infallible>(Arc::new(s))
Ok(Arc::new(s))
})
.await
.unwrap();
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? this has a race condition though.
// // TODO: maybe have a stat on how long we wait to acquire the semaphore instead?
// }
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
Ok(Some(semaphore_permit))
} else {
// unlimited requests allowed
// unlimited concurrency
Ok(None)
}
}
@ -955,7 +946,7 @@ impl Web3ProxyApp {
Ok(Arc::new(s))
})
.await
.unwrap();
.expect("infallible");
let semaphore_permit = semaphore.acquire_owned().await?;
@ -1043,7 +1034,7 @@ impl Web3ProxyApp {
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(
allowed_origin_requests_per_period,
self.db_conn.clone(),
self.db_conn(),
ip,
origin,
proxy_mode,
@ -1098,8 +1089,7 @@ impl Web3ProxyApp {
proxy_mode: ProxyMode,
rpc_secret_key: RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> {
let authorization_checks: Result<_, Web3ProxyError> = self
.rpc_secret_key_cache
self.rpc_secret_key_cache
.get_or_insert_async(&rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
@ -1119,7 +1109,6 @@ impl Web3ProxyApp {
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
// TODO: don't expect. return an application error
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.conn())
.await?
@ -1129,8 +1118,8 @@ impl Web3ProxyApp {
.filter(balance::Column::UserId.eq(user_model.id))
.one(db_replica.conn())
.await?
.map(|x| x.available_balance)
.unwrap_or_default();
.expect("related balance")
.available_balance;
let user_tier_model =
user_tier::Entity::find_by_id(user_model.user_tier_id)
@ -1220,9 +1209,7 @@ impl Web3ProxyApp {
None => Ok(AuthorizationChecks::default()),
}
})
.await;
authorization_checks
.await
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
@ -1246,9 +1233,7 @@ impl Web3ProxyApp {
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self
.registered_user_semaphore(&authorization_checks)
.await?;
let semaphore = self.user_semaphore(&authorization_checks).await?;
let authorization = Authorization::try_new(
authorization_checks,

@ -47,14 +47,15 @@ pub enum Web3ProxyError {
Database(DbErr),
#[display(fmt = "{:#?}, {:#?}", _0, _1)]
EipVerificationFailed(Box<Web3ProxyError>, Box<Web3ProxyError>),
EthersHttpClientError(ethers::prelude::HttpClientError),
EthersProviderError(ethers::prelude::ProviderError),
EthersWsClientError(ethers::prelude::WsClientError),
FlumeRecvError(flume::RecvError),
EthersHttpClient(ethers::prelude::HttpClientError),
EthersProvider(ethers::prelude::ProviderError),
EthersWsClient(ethers::prelude::WsClientError),
FlumeRecv(flume::RecvError),
GasEstimateNotU256,
Headers(headers::Error),
HeaderToString(ToStrError),
InfluxDb2RequestError(influxdb2::RequestError),
Hyper(hyper::Error),
InfluxDb2Request(influxdb2::RequestError),
#[display(fmt = "{} > {}", min, max)]
#[from(ignore)]
InvalidBlockBounds {
@ -64,6 +65,7 @@ pub enum Web3ProxyError {
InvalidHeaderValue(InvalidHeaderValue),
InvalidEip,
InvalidInviteCode,
Io(std::io::Error),
UnknownReferralCode,
InvalidReferer,
InvalidSignatureLength,
@ -88,6 +90,12 @@ pub enum Web3ProxyError {
num_known: usize,
min_head_rpcs: usize,
},
#[display(fmt = "{}/{}", available, needed)]
#[from(ignore)]
NotEnoughSoftLimit {
available: u32,
needed: u32,
},
NotFound,
NotImplemented,
OriginRequired,
@ -136,6 +144,7 @@ pub enum Web3ProxyError {
impl Web3ProxyError {
pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) {
// TODO: include a unique request id in the data
let (code, err): (StatusCode, JsonRpcErrorData) = match self {
Self::AccessDenied => {
// TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident
@ -223,7 +232,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersHttpClientError(err) => {
Self::EthersHttpClient(err) => {
warn!("EthersHttpClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -234,7 +243,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersProviderError(err) => {
Self::EthersProvider(err) => {
warn!("EthersProviderError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -245,7 +254,7 @@ impl Web3ProxyError {
},
)
}
Self::EthersWsClientError(err) => {
Self::EthersWsClient(err) => {
warn!("EthersWsClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -256,7 +265,7 @@ impl Web3ProxyError {
},
)
}
Self::FlumeRecvError(err) => {
Self::FlumeRecv(err) => {
warn!("FlumeRecvError err={:#?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -290,7 +299,19 @@ impl Web3ProxyError {
},
)
}
Self::InfluxDb2RequestError(err) => {
Self::Hyper(err) => {
warn!("hyper err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose these error strings?
message: Cow::Owned(err.to_string()),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::InfluxDb2Request(err) => {
// TODO: attach a request id to the message and to this error so that if people report problems, we can dig in sentry to find out more
error!("influxdb2 err={:?}", err);
(
@ -371,6 +392,18 @@ impl Web3ProxyError {
},
)
}
Self::Io(err) => {
warn!("std io err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose our io error strings?
message: Cow::Owned(err.to_string()),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::UnknownReferralCode => {
debug!("UnknownReferralCode");
(
@ -528,6 +561,20 @@ impl Web3ProxyError {
},
)
}
Self::NotEnoughSoftLimit { available, needed } => {
error!("NotEnoughSoftLimit {}/{}", available, needed);
(
StatusCode::BAD_GATEWAY,
JsonRpcErrorData {
message: Cow::Owned(format!(
"not enough soft limit available {}/{}",
available, needed
)),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
},
)
}
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404?

@ -28,6 +28,8 @@ use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use self::errors::Web3ProxyResult;
/// simple keys for caching responses
#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)]
pub enum ResponseCacheKey {
@ -49,7 +51,7 @@ pub async fn serve(
proxy_app: Arc<Web3ProxyApp>,
mut shutdown_receiver: broadcast::Receiver<()>,
shutdown_complete_sender: broadcast::Sender<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// setup caches for whatever the frontend needs
// no need for max items since it is limited by the enum key
// TODO: latest moka allows for different ttls for different

@ -36,7 +36,7 @@ pub async fn health(
Ok(_health(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)
@ -70,7 +70,7 @@ pub async fn backups_needed(
Ok(_backups_needed(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)
@ -120,7 +120,7 @@ pub async fn status(
Ok(_status(app).await)
})
.await
.unwrap();
.expect("this cache get is infallible");
Response::builder()
.status(code)

@ -8,13 +8,14 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::Web3ProxyApp;
use crate::frontend::errors::Web3ProxyResult;
/// Run a prometheus metrics server on the given port.
pub async fn serve(
app: Arc<Web3ProxyApp>,
port: u16,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// routes should be ordered most to least common
let app = Router::new().route("/", get(root)).layer(Extension(app));

@ -18,7 +18,6 @@ use std::convert::Infallible;
use std::hash::Hash;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::broadcast;
use tokio::time::Duration;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -193,7 +192,7 @@ impl Web3Rpcs {
.blocks_by_hash
.get_or_insert_async::<Infallible, _>(&block_hash, async move { Ok(block) })
.await
.unwrap();
.expect("this cache get is infallible");
Ok(block)
}
@ -219,12 +218,11 @@ impl Web3Rpcs {
// TODO: if error, retry?
let block: Web3ProxyBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)))
.await?
.request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
authorization.clone(),
)
.await?
.and_then(|x| {
@ -366,7 +364,7 @@ impl Web3Rpcs {
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {

@ -374,7 +374,7 @@ impl ConsensusFinder {
.first_seen
.get_or_insert_async::<Infallible>(block.hash(), async { Ok(Instant::now()) })
.await
.unwrap();
.expect("this cache get is infallible");
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();

@ -3,7 +3,7 @@ use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest};
use crate::response_cache::JsonRpcResponseData;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
@ -36,7 +35,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tokio::time::{sleep, sleep_until, Duration, Instant};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
@ -46,8 +45,6 @@ pub struct Web3Rpcs {
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: ArcSwap<HashMap<String, Arc<Web3Rpc>>>,
/// notify all http providers to check their blocks at the same time
pub(crate) http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -78,9 +75,7 @@ impl Web3Rpcs {
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: u64,
db_conn: Option<DatabaseConnection>,
http_client: Option<reqwest::Client>,
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
min_head_rpcs: usize,
@ -91,82 +86,18 @@ impl Web3Rpcs {
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(
Arc<Self>,
AnyhowJoinHandle<()>,
Web3ProxyJoinHandle<()>,
watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
// watch::Receiver<Arc<ConsensusWeb3Rpcs>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
// TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check?
let expected_block_time_ms = match chain_id {
// ethereum
1 => 12_000,
// ethereum-goerli
5 => 12_000,
// polygon
137 => 2_000,
// fantom
250 => 1_000,
// arbitrum
42161 => 500,
// anything else
_ => {
warn!(
"unexpected chain_id ({}). polling every {} seconds",
chain_id, 10
);
10_000
}
};
let http_interval_sender = if http_client.is_some() {
let (sender, _) = broadcast::channel(1);
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let sender = Arc::new(sender);
let f = {
let sender = sender.clone();
async move {
loop {
interval.tick().await;
// trace!("http interval ready");
if sender.send(()).is_err() {
// errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet
// TODO: i'm seeing this error a lot more than expected
trace!("no http receivers");
};
}
}
};
// TODO: do something with this handle?
tokio::spawn(f);
Some(sender)
} else {
None
};
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher this? need to know actual allocated size
// TODO: actual weighter on this
// TODO: time_to_idle instead?
// TODO: limits from config
let blocks_by_hash: BlocksByHashCache =
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
// .max_capacity(1024 * 1024 * 1024)
// .weigher(|_k, v: &Web3ProxyBlock| {
// 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
// })
// .time_to_live(Duration::from_secs(30 * 60))
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all block numbers are the same size, so no need for weigher
// TODO: limits from config
@ -185,7 +116,6 @@ impl Web3Rpcs {
blocks_by_hash,
blocks_by_number,
by_name,
http_interval_sender,
max_block_age,
max_block_lag,
min_head_rpcs,
@ -214,7 +144,7 @@ impl Web3Rpcs {
&self,
app: &Web3ProxyApp,
rpc_configs: HashMap<String, Web3RpcConfig>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// safety checks
if rpc_configs.len() < app.config.min_synced_rpcs {
// TODO: don't count disabled servers!
@ -232,15 +162,14 @@ impl Web3Rpcs {
let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit);
// TODO: require a buffer?
anyhow::ensure!(
sum_soft_limit >= self.min_sum_soft_limit,
"Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
self.min_sum_soft_limit,
);
if sum_soft_limit < self.min_sum_soft_limit {
return Err(Web3ProxyError::NotEnoughSoftLimit {
available: sum_soft_limit,
needed: self.min_sum_soft_limit,
});
}
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
@ -261,7 +190,6 @@ impl Web3Rpcs {
let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone());
let blocks_by_hash_cache = self.blocks_by_hash.clone();
let http_interval_sender = self.http_interval_sender.clone();
let chain_id = app.config.chain_id;
debug!("spawning {}", server_name);
@ -272,7 +200,6 @@ impl Web3Rpcs {
vredis_pool,
chain_id,
http_client,
http_interval_sender,
blocks_by_hash_cache,
block_sender,
pending_tx_id_sender,
@ -312,7 +239,7 @@ impl Web3Rpcs {
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
// TODO: will this retry automatically? i don't think so
// TODO: retry automatically
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
@ -322,6 +249,15 @@ impl Web3Rpcs {
}
}
let num_rpcs = self.by_name.load().len();
if num_rpcs < self.min_head_rpcs {
return Err(Web3ProxyError::NotEnoughRpcs {
num_known: num_rpcs,
min_head_rpcs: self.min_head_rpcs,
});
}
Ok(())
}
@ -349,7 +285,7 @@ impl Web3Rpcs {
authorization: Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut futures = vec![];
// setup the transaction funnel
@ -1317,7 +1253,6 @@ mod tests {
#[cfg(test)]
fn new_peak_latency() -> PeakEwmaLatency {
const NANOS_PER_MILLI: f64 = 1_000_000.0;
PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1))
}
@ -1490,7 +1425,6 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender: block_sender.clone(),
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1568,7 +1502,7 @@ mod tests {
.send_head_block_result(
Ok(Some(lagged_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1588,7 +1522,7 @@ mod tests {
.send_head_block_result(
Ok(Some(lagged_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1620,7 +1554,7 @@ mod tests {
.send_head_block_result(
Ok(Some(head_block.clone())),
&block_sender,
rpcs.blocks_by_hash.clone(),
&rpcs.blocks_by_hash,
)
.await
.unwrap();
@ -1741,7 +1675,6 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
@ -1911,7 +1844,6 @@ mod tests {
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,

@ -2,18 +2,18 @@
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::app::{flatten_handle, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::prelude::{Bytes, Middleware, TxHash, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency};
use log::{debug, error, info, trace, warn, Level};
use log::{debug, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
use parking_lot::RwLock;
@ -21,16 +21,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
use std::borrow::Cow;
use std::cmp::min;
use std::convert::Infallible;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize};
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::sync::watch;
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
use url::Url;
@ -88,13 +84,12 @@ impl Web3Rpc {
db_conn: Option<DatabaseConnection>,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
// TODO: rename to http_new_head_interval_sender?
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
redis_pool: Option<RedisPool>,
block_interval: Duration,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now();
let hard_limit = match (config.hard_limit, redis_pool) {
@ -151,8 +146,6 @@ impl Web3Rpc {
}
}
let (disconnect_sender, disconnect_receiver) = watch::channel(false);
let (head_block, _) = watch::channel(None);
// Spawn the task for calculting average peak latency
@ -170,7 +163,7 @@ impl Web3Rpc {
let http_provider = if let Some(http_url) = config.http_url {
let http_url = http_url.parse::<Url>()?;
Some(connect_http(Cow::Owned(http_url), http_client)?)
Some(connect_http(http_url, http_client, block_interval)?)
// TODO: check the provider is on the right chain
} else {
@ -180,20 +173,21 @@ impl Web3Rpc {
let ws_provider = if let Some(ws_url) = config.ws_url {
let ws_url = ws_url.parse::<Url>()?;
Some(connect_ws(Cow::Owned(ws_url), usize::MAX).await?)
Some(connect_ws(ws_url, usize::MAX).await?)
// TODO: check the provider is on the right chain
} else {
None
};
let (disconnect_watch, _) = watch::channel(false);
let new_rpc = Self {
automatic_block_limit,
backup,
block_data_limit,
created_at: Some(created_at),
db_conn: db_conn.clone(),
disconnect_watch: Some(disconnect_sender),
display_name: config.display_name,
hard_limit,
hard_limit_until: Some(hard_limit_until),
@ -203,6 +197,8 @@ impl Web3Rpc {
peak_latency: Some(peak_latency),
soft_limit: config.soft_limit,
tier: config.tier,
ws_provider,
disconnect_watch: Some(disconnect_watch),
..Default::default()
};
@ -221,8 +217,6 @@ impl Web3Rpc {
block_map,
block_sender,
chain_id,
disconnect_receiver,
http_interval_sender,
tx_id_sender,
)
.await
@ -264,13 +258,12 @@ impl Web3Rpc {
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let handle = self.wait_for_request_handle(authorization, None).await?;
let head_block_num_future = handle.request::<Option<()>, U256>(
let head_block_num_future = self.request::<Option<()>, U256>(
"eth_blockNumber",
&None,
// error here are expected, so keep the level low
Level::Debug.into(),
authorization.clone(),
);
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
@ -288,9 +281,7 @@ impl Web3Rpc {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let handle = self.wait_for_request_handle(authorization, None).await?;
let archive_result: Result<Bytes, _> = handle
let archive_result: Result<Bytes, _> = self
.request(
"eth_getCode",
&json!((
@ -299,6 +290,7 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Level::Trace.into(),
authorization.clone(),
)
.await;
@ -377,23 +369,20 @@ impl Web3Rpc {
}
/// query the web3 provider to confirm it is on the expected chain with the expected data available
async fn check_provider(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
async fn check_provider(self: &Arc<Self>, chain_id: u64) -> Web3ProxyResult<()> {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None)
.await
.context(format!("waiting for request handle on {}", self))?
.request("eth_chainId", &json!(Vec::<()>::new()), Level::Trace.into())
.request(
"eth_chainId",
&json!(Vec::<()>::new()),
Level::Trace.into(),
authorization.clone(),
)
.await;
trace!("found_chain_id: {:#?}", found_chain_id);
@ -406,12 +395,14 @@ impl Web3Rpc {
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
.context(format!("failed @ {}", self))
.into());
}
}
Err(e) => {
return Err(anyhow::Error::from(e)
.context(format!("unable to parse eth_chainId from {}", self)));
.context(format!("unable to parse eth_chainId from {}", self))
.into());
}
}
@ -426,27 +417,25 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>,
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
block_map: &BlocksByHashCache,
) -> Web3ProxyResult<()> {
let new_head_block = match new_head_block {
Ok(None) => {
{
let head_block_tx = self.head_block.as_ref().unwrap();
let head_block_tx = self.head_block.as_ref().unwrap();
if head_block_tx.borrow().is_none() {
// we previously sent a None. return early
return Ok(());
}
let age = self.created_at.unwrap().elapsed().as_millis();
debug!("clearing head block on {} ({}ms old)!", self, age);
head_block_tx.send_replace(None);
if head_block_tx.borrow().is_none() {
// we previously sent a None. return early
return Ok(());
}
let age = self.created_at.unwrap().elapsed().as_millis();
debug!("clearing head block on {} ({}ms old)!", self, age);
head_block_tx.send_replace(None);
None
}
Ok(Some(new_head_block)) => {
@ -461,7 +450,8 @@ impl Web3Rpc {
&new_hash,
async move { Ok(new_head_block) },
)
.await?;
.await
.expect("this cache get is infallible");
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
@ -504,9 +494,61 @@ impl Web3Rpc {
*self.disconnect_watch.as_ref().unwrap().borrow()
}
async fn healthcheck(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
error_handler: RequestErrorHandler,
) -> Web3ProxyResult<()> {
let head_block = self.head_block.as_ref().unwrap().borrow().clone();
if let Some(head_block) = head_block {
let head_block = head_block.block;
// TODO: if head block is very old and not expected to be syncing, emit warning
let block_number = head_block.number.context("no block number")?;
let to = if let Some(txid) = head_block.transactions.last().cloned() {
let tx = self
.request::<_, Option<Transaction>>(
"eth_getTransactionByHash",
&(txid,),
error_handler,
authorization.clone(),
)
.await?
.context("no transaction")?;
// TODO: what default? something real?
tx.to.unwrap_or_else(|| {
"0xdead00000000000000000000000000000000beef"
.parse::<Address>()
.expect("deafbeef")
})
} else {
"0xdead00000000000000000000000000000000beef"
.parse::<Address>()
.expect("deafbeef")
};
let _code = self
.request::<_, Option<Bytes>>(
"eth_getCode",
&(to, block_number),
error_handler,
authorization.clone(),
)
.await?;
} else {
// TODO: if head block is none for too long, give an error
}
Ok(())
}
/// subscribe to blocks and transactions
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
/// TODO: should more of these args be on self? chain_id for sure
#[allow(clippy::too_many_arguments)]
async fn subscribe(
self: Arc<Self>,
@ -514,220 +556,97 @@ impl Web3Rpc {
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
disconnect_receiver: watch::Receiver<bool>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
RequestErrorHandler::DebugLevel
} else {
RequestErrorHandler::ErrorLevel
};
todo!();
debug!("starting subscriptions on {}", self);
self.check_provider(chain_id).await?;
/*
let mut futures = vec![];
while false {
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
// health check that runs if there haven't been any recent requests
{
// TODO: move this into a proper function
let authorization = authorization.clone();
let rpc = self.clone();
{
// TODO: move this into a proper function
let authorization = authorization.clone();
let block_sender = block_sender.clone();
let rpc = self.clone();
let (ready_tx, ready_rx) = oneshot::channel();
let f = async move {
// initial sleep to allow for the initial connection
rpc.retrying_connect(
block_sender.as_ref(),
chain_id,
authorization.db_conn.as_ref(),
delay_start,
)
.await?;
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
let health_sleep_seconds = 10;
// provider is ready
ready_tx.send(()).unwrap();
// health check loop
let f = async move {
// TODO: benchmark this and lock contention
let mut old_total_requests = 0;
let mut new_total_requests;
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
let health_sleep_seconds = 10;
// TODO: errors here should not cause the loop to exit!
while !rpc.should_disconnect() {
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
// TODO: benchmark this and lock contention
let mut old_total_requests = 0;
let mut new_total_requests;
// health check loop
loop {
// TODO: do we need this to be abortable?
if rpc.should_disconnect() {
break;
}
sleep(Duration::from_secs(health_sleep_seconds)).await;
trace!("health check on {}", rpc);
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
if let Some(client) = rpc.ws_provider.read().await.clone() {
// health check as a way of keeping this rpc's request_ewma accurate
// TODO: do something different if this is a backup server?
new_total_requests = rpc.total_requests.load(atomic::Ordering::Acquire);
// TODO: how many requests should we require in order to skip a health check?
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
let head_block = rpc.head_block.as_ref().unwrap().borrow().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| {
let block = x.block;
let block_number = block.number?;
let txid = block.transactions.last().cloned()?;
Some((block_number, txid))
}) {
let to = rpc
.wait_for_query::<_, Option<Transaction>>(
"eth_getTransactionByHash",
&(txid,),
error_handler,
authorization.clone(),
Some(client.clone()),
)
.await
.and_then(|tx| {
let tx = tx.context("no transaction found")?;
// TODO: what default? something real?
let to = tx.to.unwrap_or_else(|| {
"0xdead00000000000000000000000000000000beef"
.parse::<Address>()
.expect("deafbeef")
});
Ok(to)
});
let code = match to {
Err(err) => {
// TODO: an "error" here just means that the hash wasn't available. i dont think its truly an "error"
if rpc.backup {
debug!(
"{} failed health check query! {:#?}",
rpc, err
);
} else {
warn!(
"{} failed health check query! {:#?}",
rpc, err
);
}
continue;
}
Ok(to) => {
rpc.wait_for_query::<_, Option<Bytes>>(
"eth_getCode",
&(to, block_number),
error_handler,
authorization.clone(),
Some(client),
)
.await
}
};
if let Err(err) = code {
if rpc.backup {
debug!("{} failed health check query! {:#?}", rpc, err);
} else {
warn!("{} failed health check query! {:#?}", rpc, err);
}
continue;
}
}
}
old_total_requests = new_total_requests;
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
if let Err(err) = rpc.healthcheck(&authorization, error_handler).await {
// TODO: different level depending on the error handler
warn!("health checking {} failed: {:?}", rpc, err);
}
}
debug!("health checks for {} exited", rpc);
Ok(())
};
// TODO: should we count the requests done inside this health check
old_total_requests = new_total_requests;
futures.push(flatten_handle(tokio::spawn(f)));
// wait on the initial connection
ready_rx.await?;
}
if let Some(block_sender) = &block_sender {
// TODO: do we need this to be abortable?
let f = self.clone().subscribe_new_heads(
authorization.clone(),
http_interval_receiver,
block_sender.clone(),
block_map.clone(),
);
futures.push(flatten_handle(tokio::spawn(f)));
}
if let Some(tx_id_sender) = &tx_id_sender {
// TODO: do we need this to be abortable?
let f = self
.clone()
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
futures.push(flatten_handle(tokio::spawn(f)));
}
match try_join_all(futures).await {
Ok(_) => {
// future exited without error
// TODO: think about this more. we never set it to false. this can't be right
break;
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
Err(err) => {
let disconnect_sender = self.disconnect_watch.as_ref().unwrap();
if self.reconnect.load(atomic::Ordering::Acquire) {
warn!("{} connection ended. reconnecting. err={:?}", self, err);
debug!("healthcheck loop on {} exited", rpc);
// TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures.
disconnect_sender.send_replace(true);
disconnect_sender.send_replace(false);
Ok(())
};
// we call retrying_connect here with initial_delay=true. above, initial_delay=false
delay_start = true;
continue;
}
// reconnect is not enabled.
if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
break;
} else {
error!("{} subscription exited. err={:?}", self, err);
disconnect_sender.send_replace(true);
break;
}
}
}
futures.push(flatten_handle(tokio::spawn(f)));
}
// subscribe to new heads
if let Some(block_sender) = &block_sender {
// TODO: do we need this to be abortable?
let f = self.clone().subscribe_new_heads(
authorization.clone(),
block_sender.clone(),
block_map.clone(),
);
*/
info!("all subscriptions on {} completed", self);
futures.push(flatten_handle(tokio::spawn(f)));
}
// subscribe pending transactions
// TODO: make this opt-in. its a lot of bandwidth
if let Some(tx_id_sender) = tx_id_sender {
// TODO: do we need this to be abortable?
let f = self
.clone()
.subscribe_pending_transactions(authorization.clone(), tx_id_sender);
futures.push(flatten_handle(tokio::spawn(f)));
}
// try_join on the futures
if let Err(err) = try_join_all(futures).await {
warn!("subscription erred: {:?}", err);
}
debug!("subscriptions on {} exited", self);
self.disconnect_watch
.as_ref()
.expect("disconnect_watch should always be set")
.send_replace(true);
Ok(())
}
@ -736,197 +655,76 @@ impl Web3Rpc {
async fn subscribe_new_heads(
self: Arc<Self>,
authorization: Arc<Authorization>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
) -> Web3ProxyResult<()> {
debug!("subscribing to new heads on {}", self);
if let Some(ws_provider) = self.ws_provider.as_ref() {
todo!("subscribe")
// todo: move subscribe_blocks onto the request handle
let active_request_handle = self.wait_for_request_handle(&authorization, None).await;
let mut blocks = ws_provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// but all seeing the same block twice won't break anything
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: can we force this to use the websocket?
let latest_block: Result<Option<ArcBlock>, _> = self
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
authorization,
)
.await;
self.send_head_block_result(latest_block, &block_sender, &block_map)
.await?;
while let Some(block) = blocks.next().await {
if self.should_disconnect() {
break;
}
let block = Arc::new(block);
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
.await?;
}
} else if let Some(http_provider) = self.http_provider.as_ref() {
todo!("poll")
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
let mut blocks = http_provider.watch_blocks().await?;
while let Some(block_hash) = blocks.next().await {
if self.should_disconnect() {
break;
}
let block = if let Some(block) = block_map.get(&block_hash) {
block.block
} else if let Some(block) = http_provider.get_block(block_hash).await? {
Arc::new(block)
} else {
continue;
};
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
.await?;
}
} else {
unimplemented!("no ws or http provider!")
}
/*
match provider.as_ref() {
Web3Provider::Http(_client) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver.unwrap();
let mut last_hash = H256::zero();
while !self.should_disconnect() {
// TODO: what should the max_wait be?
// we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one
match self.wait_for_request_handle(&authorization, None).await {
Ok(active_request_handle) => {
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
)
.await;
match block {
Ok(None) => {
warn!("no head block on {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
}
Ok(Some(block)) => {
if let Some(new_hash) = block.hash {
// don't send repeat blocks
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.send_head_block_result(
Ok(Some(block)),
&block_sender,
block_map.clone(),
)
.await?;
}
} else {
// TODO: why is this happening?
warn!("empty head block on {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
}
}
Err(err) => {
// we did not get a block back. something is up with the server. take it out of rotation
self.send_head_block_result(
Err(err),
&block_sender,
block_map.clone(),
)
.await?;
}
}
}
Err(err) => {
warn!("Internal error on latest block from {}. {:?}", self, err);
self.send_head_block_result(Ok(None), &block_sender, block_map.clone())
.await?;
// TODO: what should we do? sleep? extra time?
}
}
// wait for the next interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
// channel is closed! that's not good. bubble the error up
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
if self.backup {
debug!("http interval on {} lagging by {}!", self, lagged);
} else {
warn!("http interval on {} lagging by {}!", self, lagged);
}
}
}
}
}
}
Web3Provider::Both(_, client) | Web3Provider::Ws(client) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle =
self.wait_for_request_handle(&authorization, None).await;
let mut stream = client.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// but all that does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: do this part over http?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, None)
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
)
.await;
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
.hash
.expect("blocks should always have a hash here"),
_ => H256::zero(),
};
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
while let Some(new_block) = stream.next().await {
// TODO: select on disconnect_watch instead of waiting for a block to arrive
if self.should_disconnect() {
break;
}
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
.hash
.expect("blocks should always have a hash here");
if new_hash == last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue;
} else {
last_hash = new_hash;
}
self.send_head_block_result(
Ok(Some(Arc::new(new_block))),
&block_sender,
block_map.clone(),
)
.await?;
}
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
debug!("new_heads subscription to {} ended", self);
}
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
}
*/
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
self.send_head_block_result(Ok(None), &block_sender, &block_map)
.await?;
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed"))
Err(anyhow!("new_heads subscription exited. reconnect needed").into())
}
}
@ -935,7 +733,7 @@ impl Web3Rpc {
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// TODO: make this subscription optional
self.wait_for_disconnect().await?;
@ -985,9 +783,7 @@ impl Web3Rpc {
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!(
"pending_transactions subscription exited. reconnect needed"
))
Err(anyhow!("pending_transactions subscription exited. reconnect needed").into())
}
}
@ -1034,7 +830,7 @@ impl Web3Rpc {
}
// TODO: sleep how long? maybe just error?
// TODO: instead of an arbitrary sleep, subscribe to the head block on this
// TODO: instead of an arbitrary sleep, subscribe to the head block on this?
sleep(Duration::from_millis(10)).await;
}
Err(err) => return Err(err),
@ -1099,36 +895,39 @@ impl Web3Rpc {
}
async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> {
let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe();
let mut disconnect_subscription = self.disconnect_watch.as_ref().unwrap().subscribe();
loop {
if *disconnect_watch.borrow_and_update() {
if *disconnect_subscription.borrow_and_update() {
// disconnect watch is set to "true"
return Ok(());
}
// wait for disconnect_watch to change
disconnect_watch.changed().await?;
// wait for disconnect_subscription to change
disconnect_subscription.changed().await?;
}
}
pub async fn wait_for_query<P, R>(
pub async fn request<P, R>(
self: &Arc<Self>,
method: &str,
params: &P,
revert_handler: RequestErrorHandler,
authorization: Arc<Authorization>,
) -> anyhow::Result<R>
) -> Web3ProxyResult<R>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send,
{
self.wait_for_request_handle(&authorization, None)
// TODO: take max_wait as a function argument?
let x = self
.wait_for_request_handle(&authorization, None)
.await?
.request::<P, R>(method, params, revert_handler)
.await
.context("ProviderError from the backend")
.await?;
Ok(x)
}
}
@ -1255,7 +1054,7 @@ impl fmt::Display for Web3Rpc {
mod tests {
#![allow(unused_imports)]
use super::*;
use ethers::types::{Block, U256};
use ethers::types::{Block, H256, U256};
#[test]
fn test_archive_node_has_block_data() {

@ -1,26 +1,20 @@
use anyhow::anyhow;
use derive_more::From;
use ethers::providers::{Authorization, ConnectionDetails};
use std::{borrow::Cow, time::Duration};
use std::time::Duration;
use url::Url;
// TODO: our own structs for these that handle streaming large responses
pub type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
pub type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option<Authorization> {
pub fn extract_auth(url: &mut Url) -> Option<Authorization> {
if let Some(pass) = url.password().map(|x| x.to_string()) {
// to_string is needed because we are going to remove these items from the url
let user = url.username().to_string();
// clear username and password from the url
let mut_url = url.to_mut();
mut_url
.set_username("")
url.set_username("")
.expect("unable to clear username on websocket");
mut_url
.set_password(None)
url.set_password(None)
.expect("unable to clear password on websocket");
// keep them
@ -33,35 +27,36 @@ pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option<Authorization> {
/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used
/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set
pub fn connect_http(
mut url: Cow<'_, Url>,
mut url: Url,
http_client: Option<reqwest::Client>,
interval: Duration,
) -> anyhow::Result<EthersHttpProvider> {
let auth = extract_auth(&mut url);
let provider = if url.scheme().starts_with("http") {
let mut provider = if url.scheme().starts_with("http") {
let provider = if let Some(auth) = auth {
ethers::providers::Http::new_with_auth(url.into_owned(), auth)?
ethers::providers::Http::new_with_auth(url, auth)?
} else if let Some(http_client) = http_client {
ethers::providers::Http::new_with_client(url.into_owned(), http_client)
ethers::providers::Http::new_with_client(url, http_client)
} else {
ethers::providers::Http::new(url.into_owned())
ethers::providers::Http::new(url)
};
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(12))
.into()
ethers::providers::Provider::new(provider).interval(Duration::from_secs(2))
} else {
return Err(anyhow::anyhow!("only http servers are supported"));
return Err(anyhow::anyhow!(
"only http servers are supported. cannot use {}",
url
));
};
provider.set_interval(interval);
Ok(provider)
}
pub async fn connect_ws(
mut url: Cow<'_, Url>,
reconnects: usize,
) -> anyhow::Result<EthersWsProvider> {
pub async fn connect_ws(mut url: Url, reconnects: usize) -> anyhow::Result<EthersWsProvider> {
let auth = extract_auth(&mut url);
let provider = if url.scheme().starts_with("ws") {
@ -76,7 +71,7 @@ pub async fn connect_ws(
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters
ethers::providers::Provider::new(provider).into()
ethers::providers::Provider::new(provider)
} else {
return Err(anyhow::anyhow!("ws servers are supported"));
};

@ -1,5 +1,6 @@
use super::one::Web3Rpc;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::Web3ProxyResult;
use anyhow::Context;
use chrono::Utc;
use entities::revert_log;
@ -13,7 +14,7 @@ use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
use tokio::time::{Duration, Instant};
#[derive(Debug)]
pub enum OpenRequestResult {
@ -75,7 +76,7 @@ impl Authorization {
self: Arc<Self>,
method: Method,
params: EthCallFirstParams,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let rpc_key_id = match self.checks.rpc_secret_key_id {
Some(rpc_key_id) => rpc_key_id.into(),
None => {

@ -1,4 +1,4 @@
use crate::frontend::authorization::Authorization;
use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult};
use super::many::Web3Rpcs;
///! Load balanced communication with a group of web3 providers
@ -70,7 +70,7 @@ impl Web3Rpcs {
rpc: Arc<Web3Rpc>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxStatus>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?

@ -8,9 +8,9 @@ pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
use crate::app::RpcSecretKeyCache;
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::Web3ProxyError;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::rpcs::one::Web3Rpc;
use anyhow::Context;
use anyhow::{anyhow, Context};
use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
@ -229,13 +229,14 @@ impl BufferedRpcQueryStats {
db_conn: &DatabaseConnection,
key: RpcQueryKey,
rpc_secret_key_cache: Option<&RpcSecretKeyCache>,
) -> anyhow::Result<()> {
anyhow::ensure!(
key.response_timestamp > 0,
"no response_timestamp! This is a bug! {:?} {:?}",
key,
self
);
) -> Web3ProxyResult<()> {
if key.response_timestamp == 0 {
return Err(Web3ProxyError::Anyhow(anyhow!(
"no response_timestamp! This is a bug! {:?} {:?}",
key,
self
)));
}
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
@ -670,8 +671,9 @@ impl RpcQueryStats {
method: Option<&str>,
) -> Decimal {
// for now, always return 0 for cost
return 0.into();
0.into()
/*
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
@ -704,5 +706,6 @@ impl RpcQueryStats {
}
cost
*/
}
}

@ -1,5 +1,6 @@
use super::{AppStat, RpcQueryKey};
use crate::app::RpcSecretKeyCache;
use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle};
use crate::frontend::errors::Web3ProxyResult;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
@ -9,7 +10,6 @@ use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::interval;
#[derive(Debug, Default)]
@ -32,7 +32,7 @@ pub struct BufferedRpcQueryStats {
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
pub background_handle: Web3ProxyJoinHandle<()>,
}
pub struct StatBuffer {
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
@ -96,7 +96,7 @@ impl StatBuffer {
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval =