diff --git a/Cargo.lock b/Cargo.lock index 51b231ce..7a79b0c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6533,6 +6533,7 @@ dependencies = [ "hex_fmt", "hostname", "http", + "hyper", "influxdb2", "influxdb2-structmap", "ipnet", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 52e517c7..17072f1c 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -55,6 +55,7 @@ hdrhistogram = "7.5.2" hex_fmt = "0.3.0" hostname = "0.3.1" http = "0.2.9" +hyper = { version = "0.14.26", features = ["full"] } influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"] } influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"} ipnet = "2.7.2" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index bdc70c90..42003674 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -74,7 +74,7 @@ pub static APP_USER_AGENT: &str = concat!( // aggregate across 1 week pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; -pub type AnyhowJoinHandle = JoinHandle>; +pub type Web3ProxyJoinHandle = JoinHandle>; /// TODO: move this #[derive(Clone, Debug, Default, From)] @@ -176,7 +176,7 @@ pub struct Web3ProxyApp { // TODO: should the key be our RpcSecretKey class instead of Ulid? pub rpc_secret_key_cache: RpcSecretKeyCache, /// concurrent/parallel RPC request limits for authenticated users - pub rpc_key_semaphores: Cache>, + pub user_semaphores: Cache>, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, /// concurrent/parallel application request limits for authenticated users @@ -188,7 +188,7 @@ pub struct Web3ProxyApp { /// flatten a JoinError into an anyhow error /// Useful when joining multiple futures. -pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { +pub async fn flatten_handle(handle: Web3ProxyJoinHandle) -> Web3ProxyResult { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => Err(err), @@ -198,8 +198,8 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result /// return the first error, or Ok if everything worked pub async fn flatten_handles( - mut handles: FuturesUnordered>, -) -> anyhow::Result<()> { + mut handles: FuturesUnordered>, +) -> Web3ProxyResult<()> { while let Some(x) = handles.next().await { match x { Err(e) => return Err(e.into()), @@ -315,9 +315,9 @@ pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles pub app: Arc, /// handles for the balanced and private rpcs - pub app_handles: FuturesUnordered>, + pub app_handles: FuturesUnordered>, /// these are important and must be allowed to finish - pub background_handles: FuturesUnordered>, + pub background_handles: FuturesUnordered>, /// config changes are sent here pub new_top_config_sender: watch::Sender, /// watch this to know when to start the app @@ -359,10 +359,12 @@ impl Web3ProxyApp { } // these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error - let app_handles = FuturesUnordered::new(); + // TODO: this is a small enough group, that a vec with try_join_all is probably fine + let app_handles: FuturesUnordered> = FuturesUnordered::new(); // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) - let important_background_handles = FuturesUnordered::new(); + let important_background_handles: FuturesUnordered> = + FuturesUnordered::new(); // connect to the database and make sure the latest migrations have run let mut db_conn = None::; @@ -624,12 +626,10 @@ impl Web3ProxyApp { // TODO: what should tti be for semaphores? let bearer_token_semaphores = Cache::new(max_users); let ip_semaphores = Cache::new(max_users); - let registered_user_semaphores = Cache::new(max_users); + let user_semaphores = Cache::new(max_users); let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), top_config.app.max_block_age, top_config.app.max_block_lag, top_config.app.min_synced_rpcs, @@ -654,9 +654,7 @@ impl Web3ProxyApp { // TODO: Merge // let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn( let (private_rpcs, private_handle, _) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), // private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag None, None, @@ -688,9 +686,7 @@ impl Web3ProxyApp { } else { // TODO: do something with the spawn handle let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( - top_config.app.chain_id, db_conn.clone(), - http_client.clone(), // bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag None, None, @@ -735,7 +731,7 @@ impl Web3ProxyApp { rpc_secret_key_cache, bearer_token_semaphores, ip_semaphores, - rpc_key_semaphores: registered_user_semaphores, + user_semaphores, stat_sender, }; @@ -752,9 +748,9 @@ impl Web3ProxyApp { loop { let new_top_config = new_top_config_receiver.borrow_and_update().to_owned(); - app.apply_top_config(new_top_config) - .await - .context("failed applying new top_config")?; + if let Err(err) = app.apply_top_config(new_top_config).await { + error!("unable to apply config! {:?}", err); + }; new_top_config_receiver .changed() @@ -790,7 +786,7 @@ impl Web3ProxyApp { .into()) } - pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { + pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> { // TODO: also update self.config from new_top_config.app // connect to the backends diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index fc156631..df3ad199 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,4 +1,4 @@ -use crate::app::AnyhowJoinHandle; +use crate::app::Web3ProxyJoinHandle; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; @@ -9,7 +9,7 @@ use log::warn; use migration::sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; -use tokio::sync::broadcast; +use std::time::Duration; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -283,23 +283,45 @@ impl Web3RpcConfig { redis_pool: Option, chain_id: u64, http_client: Option, - http_interval_sender: Option>>, blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option>, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); } + // TODO: get this from config? a helper function? where does this belong? + let block_interval = match chain_id { + // ethereum + 1 => Duration::from_secs(12), + // ethereum-goerli + 5 => Duration::from_secs(12), + // polygon + 137 => Duration::from_secs(2), + // fantom + 250 => Duration::from_secs(1), + // arbitrum + 42161 => Duration::from_millis(500), + // anything else + _ => { + let default = 10; + warn!( + "unexpected chain_id ({}). polling every {} seconds", + chain_id, default + ); + Duration::from_secs(default) + } + }; + Web3Rpc::spawn( self, name, chain_id, db_conn, http_client, - http_interval_sender, redis_pool, + block_interval, blocks_by_hash_cache, block_sender, tx_id_sender, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 51ad4834..dcc3b614 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -30,7 +30,7 @@ use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use std::convert::Infallible; use std::fmt::Display; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::mem; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; @@ -42,22 +42,12 @@ use ulid::Ulid; use uuid::Uuid; /// This lets us use UUID and ULID while we transition to only ULIDs -/// TODO: include the key's description. #[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] pub enum RpcSecretKey { Ulid(Ulid), Uuid(Uuid), } -impl Hash for RpcSecretKey { - fn hash(&self, state: &mut H) { - match self { - Self::Ulid(x) => state.write_u128(x.0), - Self::Uuid(x) => state.write_u128(x.as_u128()), - } - } -} - /// TODO: should this have IpAddr and Origin or AuthorizationChecks? #[derive(Debug)] pub enum RateLimitResult { @@ -99,6 +89,17 @@ pub struct KafkaDebugLogger { num_responses: AtomicUsize, } +impl Hash for RpcSecretKey { + fn hash(&self, state: &mut H) { + let x = match self { + Self::Ulid(x) => x.0, + Self::Uuid(x) => x.as_u128(), + }; + + x.hash(state); + } +} + impl fmt::Debug for KafkaDebugLogger { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("KafkaDebugLogger") @@ -883,17 +884,13 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores - .get_or_insert_async::(ip, async move { + .get_or_insert_async::(ip, async move { // TODO: set max_concurrent_requests dynamically based on load? let s = Semaphore::new(max_concurrent_requests); Ok(Arc::new(s)) }) - .await?; - - // if semaphore.available_permits() == 0 { - // // TODO: concurrent limit hit! emit a stat? less important for anon users - // // TODO: there is probably a race here - // } + .await + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; @@ -903,8 +900,8 @@ impl Web3ProxyApp { } } - /// Limit the number of concurrent requests from the given rpc key. - pub async fn registered_user_semaphore( + /// Limit the number of concurrent requests for a given user across all of their keys + pub async fn user_semaphore( &self, authorization_checks: &AuthorizationChecks, ) -> Web3ProxyResult> { @@ -915,25 +912,19 @@ impl Web3ProxyApp { .or(Err(Web3ProxyError::UserIdZero))?; let semaphore = self - .rpc_key_semaphores - .get_or_insert_async(&user_id, async move { + .user_semaphores + .get_or_insert_async::(&user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - // trace!("new semaphore for user_id {}", user_id); - Ok::<_, Infallible>(Arc::new(s)) + Ok(Arc::new(s)) }) .await - .unwrap(); - - // if semaphore.available_permits() == 0 { - // // TODO: concurrent limit hit! emit a stat? this has a race condition though. - // // TODO: maybe have a stat on how long we wait to acquire the semaphore instead? - // } + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; Ok(Some(semaphore_permit)) } else { - // unlimited requests allowed + // unlimited concurrency Ok(None) } } @@ -955,7 +946,7 @@ impl Web3ProxyApp { Ok(Arc::new(s)) }) .await - .unwrap(); + .expect("infallible"); let semaphore_permit = semaphore.acquire_owned().await?; @@ -1043,7 +1034,7 @@ impl Web3ProxyApp { // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( allowed_origin_requests_per_period, - self.db_conn.clone(), + self.db_conn(), ip, origin, proxy_mode, @@ -1098,8 +1089,7 @@ impl Web3ProxyApp { proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { - let authorization_checks: Result<_, Web3ProxyError> = self - .rpc_secret_key_cache + self.rpc_secret_key_cache .get_or_insert_async(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); @@ -1119,7 +1109,6 @@ impl Web3ProxyApp { Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? - // TODO: don't expect. return an application error let user_model = user::Entity::find_by_id(rpc_key_model.user_id) .one(db_replica.conn()) .await? @@ -1129,8 +1118,8 @@ impl Web3ProxyApp { .filter(balance::Column::UserId.eq(user_model.id)) .one(db_replica.conn()) .await? - .map(|x| x.available_balance) - .unwrap_or_default(); + .expect("related balance") + .available_balance; let user_tier_model = user_tier::Entity::find_by_id(user_model.user_tier_id) @@ -1220,9 +1209,7 @@ impl Web3ProxyApp { None => Ok(AuthorizationChecks::default()), } }) - .await; - - authorization_checks + .await } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency @@ -1246,9 +1233,7 @@ impl Web3ProxyApp { // only allow this rpc_key to run a limited amount of concurrent requests // TODO: rate limit should be BEFORE the semaphore! - let semaphore = self - .registered_user_semaphore(&authorization_checks) - .await?; + let semaphore = self.user_semaphore(&authorization_checks).await?; let authorization = Authorization::try_new( authorization_checks, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 9126526e..c970c849 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -47,14 +47,15 @@ pub enum Web3ProxyError { Database(DbErr), #[display(fmt = "{:#?}, {:#?}", _0, _1)] EipVerificationFailed(Box, Box), - EthersHttpClientError(ethers::prelude::HttpClientError), - EthersProviderError(ethers::prelude::ProviderError), - EthersWsClientError(ethers::prelude::WsClientError), - FlumeRecvError(flume::RecvError), + EthersHttpClient(ethers::prelude::HttpClientError), + EthersProvider(ethers::prelude::ProviderError), + EthersWsClient(ethers::prelude::WsClientError), + FlumeRecv(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), - InfluxDb2RequestError(influxdb2::RequestError), + Hyper(hyper::Error), + InfluxDb2Request(influxdb2::RequestError), #[display(fmt = "{} > {}", min, max)] #[from(ignore)] InvalidBlockBounds { @@ -64,6 +65,7 @@ pub enum Web3ProxyError { InvalidHeaderValue(InvalidHeaderValue), InvalidEip, InvalidInviteCode, + Io(std::io::Error), UnknownReferralCode, InvalidReferer, InvalidSignatureLength, @@ -88,6 +90,12 @@ pub enum Web3ProxyError { num_known: usize, min_head_rpcs: usize, }, + #[display(fmt = "{}/{}", available, needed)] + #[from(ignore)] + NotEnoughSoftLimit { + available: u32, + needed: u32, + }, NotFound, NotImplemented, OriginRequired, @@ -136,6 +144,7 @@ pub enum Web3ProxyError { impl Web3ProxyError { pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) { + // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::AccessDenied => { // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident @@ -223,7 +232,7 @@ impl Web3ProxyError { }, ) } - Self::EthersHttpClientError(err) => { + Self::EthersHttpClient(err) => { warn!("EthersHttpClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -234,7 +243,7 @@ impl Web3ProxyError { }, ) } - Self::EthersProviderError(err) => { + Self::EthersProvider(err) => { warn!("EthersProviderError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -245,7 +254,7 @@ impl Web3ProxyError { }, ) } - Self::EthersWsClientError(err) => { + Self::EthersWsClient(err) => { warn!("EthersWsClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -256,7 +265,7 @@ impl Web3ProxyError { }, ) } - Self::FlumeRecvError(err) => { + Self::FlumeRecv(err) => { warn!("FlumeRecvError err={:#?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, @@ -290,7 +299,19 @@ impl Web3ProxyError { }, ) } - Self::InfluxDb2RequestError(err) => { + Self::Hyper(err) => { + warn!("hyper err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + // TODO: is it safe to expose these error strings? + message: Cow::Owned(err.to_string()), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } + Self::InfluxDb2Request(err) => { // TODO: attach a request id to the message and to this error so that if people report problems, we can dig in sentry to find out more error!("influxdb2 err={:?}", err); ( @@ -371,6 +392,18 @@ impl Web3ProxyError { }, ) } + Self::Io(err) => { + warn!("std io err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + // TODO: is it safe to expose our io error strings? + message: Cow::Owned(err.to_string()), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::UnknownReferralCode => { debug!("UnknownReferralCode"); ( @@ -528,6 +561,20 @@ impl Web3ProxyError { }, ) } + Self::NotEnoughSoftLimit { available, needed } => { + error!("NotEnoughSoftLimit {}/{}", available, needed); + ( + StatusCode::BAD_GATEWAY, + JsonRpcErrorData { + message: Cow::Owned(format!( + "not enough soft limit available {}/{}", + available, needed + )), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404? diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index e1496960..28de5880 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -28,6 +28,8 @@ use tokio::sync::broadcast; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; +use self::errors::Web3ProxyResult; + /// simple keys for caching responses #[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] pub enum ResponseCacheKey { @@ -49,7 +51,7 @@ pub async fn serve( proxy_app: Arc, mut shutdown_receiver: broadcast::Receiver<()>, shutdown_complete_sender: broadcast::Sender<()>, -) -> anyhow::Result<()> { +) -> Web3ProxyResult<()> { // setup caches for whatever the frontend needs // no need for max items since it is limited by the enum key // TODO: latest moka allows for different ttls for different diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 1f1cb94b..632f2839 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -36,7 +36,7 @@ pub async fn health( Ok(_health(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) @@ -70,7 +70,7 @@ pub async fn backups_needed( Ok(_backups_needed(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) @@ -120,7 +120,7 @@ pub async fn status( Ok(_status(app).await) }) .await - .unwrap(); + .expect("this cache get is infallible"); Response::builder() .status(code) diff --git a/web3_proxy/src/prometheus.rs b/web3_proxy/src/prometheus.rs index 2c582c24..c17b8ecd 100644 --- a/web3_proxy/src/prometheus.rs +++ b/web3_proxy/src/prometheus.rs @@ -8,13 +8,14 @@ use std::sync::Arc; use tokio::sync::broadcast; use crate::app::Web3ProxyApp; +use crate::frontend::errors::Web3ProxyResult; /// Run a prometheus metrics server on the given port. pub async fn serve( app: Arc, port: u16, mut shutdown_receiver: broadcast::Receiver<()>, -) -> anyhow::Result<()> { +) -> Web3ProxyResult<()> { // routes should be ordered most to least common let app = Router::new().route("/", get(root)).layer(Extension(app)); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 2b71b1fe..df79ac68 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -18,7 +18,6 @@ use std::convert::Infallible; use std::hash::Hash; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; -use tokio::time::Duration; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -193,7 +192,7 @@ impl Web3Rpcs { .blocks_by_hash .get_or_insert_async::(&block_hash, async move { Ok(block) }) .await - .unwrap(); + .expect("this cache get is infallible"); Ok(block) } @@ -219,12 +218,11 @@ impl Web3Rpcs { // TODO: if error, retry? let block: Web3ProxyBlock = match rpc { Some(rpc) => rpc - .wait_for_request_handle(authorization, Some(Duration::from_secs(30))) - .await? .request::<_, Option>( "eth_getBlockByHash", &json!(get_block_params), Level::Error.into(), + authorization.clone(), ) .await? .and_then(|x| { @@ -366,7 +364,7 @@ impl Web3Rpcs { // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index b11fb794..b7237ccb 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -374,7 +374,7 @@ impl ConsensusFinder { .first_seen .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) .await - .unwrap(); + .expect("this cache get is infallible"); // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index b6f924de..310a5686 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -3,7 +3,7 @@ use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; -use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; +use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; @@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest}; use crate::response_cache::JsonRpcResponseData; use crate::rpcs::transactions::TxStatus; -use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; @@ -36,7 +35,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; use tokio::sync::{broadcast, watch}; -use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; +use tokio::time::{sleep, sleep_until, Duration, Instant}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -46,8 +45,6 @@ pub struct Web3Rpcs { pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections pub(crate) by_name: ArcSwap>>, - /// notify all http providers to check their blocks at the same time - pub(crate) http_interval_sender: Option>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -78,9 +75,7 @@ impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( - chain_id: u64, db_conn: Option, - http_client: Option, max_block_age: Option, max_block_lag: Option, min_head_rpcs: usize, @@ -91,82 +86,18 @@ impl Web3Rpcs { watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( Arc, - AnyhowJoinHandle<()>, + Web3ProxyJoinHandle<()>, watch::Receiver>>, // watch::Receiver>, )> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); - // TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check? - let expected_block_time_ms = match chain_id { - // ethereum - 1 => 12_000, - // ethereum-goerli - 5 => 12_000, - // polygon - 137 => 2_000, - // fantom - 250 => 1_000, - // arbitrum - 42161 => 500, - // anything else - _ => { - warn!( - "unexpected chain_id ({}). polling every {} seconds", - chain_id, 10 - ); - 10_000 - } - }; - - let http_interval_sender = if http_client.is_some() { - let (sender, _) = broadcast::channel(1); - - // TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce - let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - let sender = Arc::new(sender); - - let f = { - let sender = sender.clone(); - - async move { - loop { - interval.tick().await; - - // trace!("http interval ready"); - - if sender.send(()).is_err() { - // errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet - // TODO: i'm seeing this error a lot more than expected - trace!("no http receivers"); - }; - } - } - }; - - // TODO: do something with this handle? - tokio::spawn(f); - - Some(sender) - } else { - None - }; - // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes - // TODO: how can we do the weigher this? need to know actual allocated size + // TODO: actual weighter on this // TODO: time_to_idle instead? - // TODO: limits from config let blocks_by_hash: BlocksByHashCache = Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); - // .max_capacity(1024 * 1024 * 1024) - // .weigher(|_k, v: &Web3ProxyBlock| { - // 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) - // }) - // .time_to_live(Duration::from_secs(30 * 60)) - // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // all block numbers are the same size, so no need for weigher // TODO: limits from config @@ -185,7 +116,6 @@ impl Web3Rpcs { blocks_by_hash, blocks_by_number, by_name, - http_interval_sender, max_block_age, max_block_lag, min_head_rpcs, @@ -214,7 +144,7 @@ impl Web3Rpcs { &self, app: &Web3ProxyApp, rpc_configs: HashMap, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // safety checks if rpc_configs.len() < app.config.min_synced_rpcs { // TODO: don't count disabled servers! @@ -232,15 +162,14 @@ impl Web3Rpcs { let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit); // TODO: require a buffer? - anyhow::ensure!( - sum_soft_limit >= self.min_sum_soft_limit, - "Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.", - sum_soft_limit, - self.min_sum_soft_limit, - ); + if sum_soft_limit < self.min_sum_soft_limit { + return Err(Web3ProxyError::NotEnoughSoftLimit { + available: sum_soft_limit, + needed: self.min_sum_soft_limit, + }); + } // turn configs into connections (in parallel) - // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() .filter_map(|(server_name, server_config)| { @@ -261,7 +190,6 @@ impl Web3Rpcs { let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); let blocks_by_hash_cache = self.blocks_by_hash.clone(); - let http_interval_sender = self.http_interval_sender.clone(); let chain_id = app.config.chain_id; debug!("spawning {}", server_name); @@ -272,7 +200,6 @@ impl Web3Rpcs { vredis_pool, chain_id, http_client, - http_interval_sender, blocks_by_hash_cache, block_sender, pending_tx_id_sender, @@ -312,7 +239,7 @@ impl Web3Rpcs { Ok(Err(err)) => { // if we got an error here, the app can continue on // TODO: include context about which connection failed - // TODO: will this retry automatically? i don't think so + // TODO: retry automatically error!("Unable to create connection. err={:?}", err); } Err(err) => { @@ -322,6 +249,15 @@ impl Web3Rpcs { } } + let num_rpcs = self.by_name.load().len(); + + if num_rpcs < self.min_head_rpcs { + return Err(Web3ProxyError::NotEnoughRpcs { + num_known: num_rpcs, + min_head_rpcs: self.min_head_rpcs, + }); + } + Ok(()) } @@ -349,7 +285,7 @@ impl Web3Rpcs { authorization: Arc, block_receiver: flume::Receiver, pending_tx_sender: Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut futures = vec![]; // setup the transaction funnel @@ -1317,7 +1253,6 @@ mod tests { #[cfg(test)] fn new_peak_latency() -> PeakEwmaLatency { - const NANOS_PER_MILLI: f64 = 1_000_000.0; PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } @@ -1490,7 +1425,6 @@ mod tests { let rpcs = Web3Rpcs { block_sender: block_sender.clone(), by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1568,7 +1502,7 @@ mod tests { .send_head_block_result( Ok(Some(lagged_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1588,7 +1522,7 @@ mod tests { .send_head_block_result( Ok(Some(lagged_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1620,7 +1554,7 @@ mod tests { .send_head_block_result( Ok(Some(head_block.clone())), &block_sender, - rpcs.blocks_by_hash.clone(), + &rpcs.blocks_by_hash, ) .await .unwrap(); @@ -1741,7 +1675,6 @@ mod tests { let rpcs = Web3Rpcs { block_sender, by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1911,7 +1844,6 @@ mod tests { let rpcs = Web3Rpcs { block_sender, by_name: ArcSwap::from_pointee(rpcs_by_name), - http_interval_sender: None, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 9015dd18..e3e3b0ea 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -2,18 +2,18 @@ use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider}; use super::request::{OpenRequestHandle, OpenRequestResult}; -use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::app::{flatten_handle, Web3ProxyJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; -use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; +use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency}; -use log::{debug, error, info, trace, warn, Level}; +use log::{debug, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; use parking_lot::RwLock; @@ -21,16 +21,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; -use std::borrow::Cow; -use std::cmp::min; use std::convert::Infallible; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; -use thread_fast_rng::rand::Rng; -use thread_fast_rng::thread_fast_rng; -use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; +use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -88,13 +84,12 @@ impl Web3Rpc { db_conn: Option, // optional because this is only used for http providers. websocket providers don't use it http_client: Option, - // TODO: rename to http_new_head_interval_sender? - http_interval_sender: Option>>, redis_pool: Option, + block_interval: Duration, block_map: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option)>>, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); let hard_limit = match (config.hard_limit, redis_pool) { @@ -151,8 +146,6 @@ impl Web3Rpc { } } - let (disconnect_sender, disconnect_receiver) = watch::channel(false); - let (head_block, _) = watch::channel(None); // Spawn the task for calculting average peak latency @@ -170,7 +163,7 @@ impl Web3Rpc { let http_provider = if let Some(http_url) = config.http_url { let http_url = http_url.parse::()?; - Some(connect_http(Cow::Owned(http_url), http_client)?) + Some(connect_http(http_url, http_client, block_interval)?) // TODO: check the provider is on the right chain } else { @@ -180,20 +173,21 @@ impl Web3Rpc { let ws_provider = if let Some(ws_url) = config.ws_url { let ws_url = ws_url.parse::()?; - Some(connect_ws(Cow::Owned(ws_url), usize::MAX).await?) + Some(connect_ws(ws_url, usize::MAX).await?) // TODO: check the provider is on the right chain } else { None }; + let (disconnect_watch, _) = watch::channel(false); + let new_rpc = Self { automatic_block_limit, backup, block_data_limit, created_at: Some(created_at), db_conn: db_conn.clone(), - disconnect_watch: Some(disconnect_sender), display_name: config.display_name, hard_limit, hard_limit_until: Some(hard_limit_until), @@ -203,6 +197,8 @@ impl Web3Rpc { peak_latency: Some(peak_latency), soft_limit: config.soft_limit, tier: config.tier, + ws_provider, + disconnect_watch: Some(disconnect_watch), ..Default::default() }; @@ -221,8 +217,6 @@ impl Web3Rpc { block_map, block_sender, chain_id, - disconnect_receiver, - http_interval_sender, tx_id_sender, ) .await @@ -264,13 +258,12 @@ impl Web3Rpc { // TODO: binary search between 90k and max? // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let handle = self.wait_for_request_handle(authorization, None).await?; - - let head_block_num_future = handle.request::, U256>( + let head_block_num_future = self.request::, U256>( "eth_blockNumber", &None, // error here are expected, so keep the level low Level::Debug.into(), + authorization.clone(), ); let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) @@ -288,9 +281,7 @@ impl Web3Rpc { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? - let handle = self.wait_for_request_handle(authorization, None).await?; - - let archive_result: Result = handle + let archive_result: Result = self .request( "eth_getCode", &json!(( @@ -299,6 +290,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Level::Trace.into(), + authorization.clone(), ) .await; @@ -377,23 +369,20 @@ impl Web3Rpc { } /// query the web3 provider to confirm it is on the expected chain with the expected data available - async fn check_provider( - self: &Arc, - block_sender: Option<&flume::Sender>, - chain_id: u64, - db_conn: Option<&DatabaseConnection>, - ) -> anyhow::Result<()> { - let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); + async fn check_provider(self: &Arc, chain_id: u64) -> Web3ProxyResult<()> { + let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); // check the server's chain_id here // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: what should the timeout be? should there be a request timeout? // trace!("waiting on chain id for {}", self); let found_chain_id: Result = self - .wait_for_request_handle(&authorization, None) - .await - .context(format!("waiting for request handle on {}", self))? - .request("eth_chainId", &json!(Vec::<()>::new()), Level::Trace.into()) + .request( + "eth_chainId", + &json!(Vec::<()>::new()), + Level::Trace.into(), + authorization.clone(), + ) .await; trace!("found_chain_id: {:#?}", found_chain_id); @@ -406,12 +395,14 @@ impl Web3Rpc { chain_id, found_chain_id ) - .context(format!("failed @ {}", self))); + .context(format!("failed @ {}", self)) + .into()); } } Err(e) => { return Err(anyhow::Error::from(e) - .context(format!("unable to parse eth_chainId from {}", self))); + .context(format!("unable to parse eth_chainId from {}", self)) + .into()); } } @@ -426,27 +417,25 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, - new_head_block: Result, ProviderError>, + new_head_block: Web3ProxyResult>, block_sender: &flume::Sender, - block_map: BlocksByHashCache, - ) -> anyhow::Result<()> { + block_map: &BlocksByHashCache, + ) -> Web3ProxyResult<()> { let new_head_block = match new_head_block { Ok(None) => { - { - let head_block_tx = self.head_block.as_ref().unwrap(); + let head_block_tx = self.head_block.as_ref().unwrap(); - if head_block_tx.borrow().is_none() { - // we previously sent a None. return early - return Ok(()); - } - - let age = self.created_at.unwrap().elapsed().as_millis(); - - debug!("clearing head block on {} ({}ms old)!", self, age); - - head_block_tx.send_replace(None); + if head_block_tx.borrow().is_none() { + // we previously sent a None. return early + return Ok(()); } + let age = self.created_at.unwrap().elapsed().as_millis(); + + debug!("clearing head block on {} ({}ms old)!", self, age); + + head_block_tx.send_replace(None); + None } Ok(Some(new_head_block)) => { @@ -461,7 +450,8 @@ impl Web3Rpc { &new_hash, async move { Ok(new_head_block) }, ) - .await?; + .await + .expect("this cache get is infallible"); // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query @@ -504,9 +494,61 @@ impl Web3Rpc { *self.disconnect_watch.as_ref().unwrap().borrow() } + async fn healthcheck( + self: &Arc, + authorization: &Arc, + error_handler: RequestErrorHandler, + ) -> Web3ProxyResult<()> { + let head_block = self.head_block.as_ref().unwrap().borrow().clone(); + + if let Some(head_block) = head_block { + let head_block = head_block.block; + + // TODO: if head block is very old and not expected to be syncing, emit warning + + let block_number = head_block.number.context("no block number")?; + + let to = if let Some(txid) = head_block.transactions.last().cloned() { + let tx = self + .request::<_, Option>( + "eth_getTransactionByHash", + &(txid,), + error_handler, + authorization.clone(), + ) + .await? + .context("no transaction")?; + + // TODO: what default? something real? + tx.to.unwrap_or_else(|| { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }) + } else { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }; + + let _code = self + .request::<_, Option>( + "eth_getCode", + &(to, block_number), + error_handler, + authorization.clone(), + ) + .await?; + } else { + // TODO: if head block is none for too long, give an error + } + + Ok(()) + } + /// subscribe to blocks and transactions /// This should only exit when the program is exiting. - /// TODO: should more of these args be on self? + /// TODO: should more of these args be on self? chain_id for sure #[allow(clippy::too_many_arguments)] async fn subscribe( self: Arc, @@ -514,220 +556,97 @@ impl Web3Rpc { block_map: BlocksByHashCache, block_sender: Option>, chain_id: u64, - disconnect_receiver: watch::Receiver, - http_interval_sender: Option>>, tx_id_sender: Option)>>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let error_handler = if self.backup { RequestErrorHandler::DebugLevel } else { RequestErrorHandler::ErrorLevel }; - todo!(); + debug!("starting subscriptions on {}", self); + + self.check_provider(chain_id).await?; - /* let mut futures = vec![]; - while false { - let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); + // health check that runs if there haven't been any recent requests + { + // TODO: move this into a proper function + let authorization = authorization.clone(); + let rpc = self.clone(); - { - // TODO: move this into a proper function - let authorization = authorization.clone(); - let block_sender = block_sender.clone(); - let rpc = self.clone(); - let (ready_tx, ready_rx) = oneshot::channel(); - let f = async move { - // initial sleep to allow for the initial connection - rpc.retrying_connect( - block_sender.as_ref(), - chain_id, - authorization.db_conn.as_ref(), - delay_start, - ) - .await?; + // TODO: how often? different depending on the chain? + // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though + let health_sleep_seconds = 10; - // provider is ready - ready_tx.send(()).unwrap(); + // health check loop + let f = async move { + // TODO: benchmark this and lock contention + let mut old_total_requests = 0; + let mut new_total_requests; - // TODO: how often? different depending on the chain? - // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though - let health_sleep_seconds = 10; + // TODO: errors here should not cause the loop to exit! + while !rpc.should_disconnect() { + new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); - // TODO: benchmark this and lock contention - let mut old_total_requests = 0; - let mut new_total_requests; - - // health check loop - loop { - // TODO: do we need this to be abortable? - if rpc.should_disconnect() { - break; - } - - sleep(Duration::from_secs(health_sleep_seconds)).await; - - trace!("health check on {}", rpc); - - // TODO: what if we just happened to have this check line up with another restart? - // TODO: think more about this - if let Some(client) = rpc.ws_provider.read().await.clone() { - // health check as a way of keeping this rpc's request_ewma accurate - // TODO: do something different if this is a backup server? - - new_total_requests = rpc.total_requests.load(atomic::Ordering::Acquire); - - // TODO: how many requests should we require in order to skip a health check? - if new_total_requests - old_total_requests < 10 { - // TODO: if this fails too many times, reset the connection - // TODO: move this into a function and the chaining should be easier - let head_block = rpc.head_block.as_ref().unwrap().borrow().clone(); - - if let Some((block_number, txid)) = head_block.and_then(|x| { - let block = x.block; - - let block_number = block.number?; - let txid = block.transactions.last().cloned()?; - - Some((block_number, txid)) - }) { - let to = rpc - .wait_for_query::<_, Option>( - "eth_getTransactionByHash", - &(txid,), - error_handler, - authorization.clone(), - Some(client.clone()), - ) - .await - .and_then(|tx| { - let tx = tx.context("no transaction found")?; - - // TODO: what default? something real? - let to = tx.to.unwrap_or_else(|| { - "0xdead00000000000000000000000000000000beef" - .parse::
() - .expect("deafbeef") - }); - - Ok(to) - }); - - let code = match to { - Err(err) => { - // TODO: an "error" here just means that the hash wasn't available. i dont think its truly an "error" - if rpc.backup { - debug!( - "{} failed health check query! {:#?}", - rpc, err - ); - } else { - warn!( - "{} failed health check query! {:#?}", - rpc, err - ); - } - continue; - } - Ok(to) => { - rpc.wait_for_query::<_, Option>( - "eth_getCode", - &(to, block_number), - error_handler, - authorization.clone(), - Some(client), - ) - .await - } - }; - - if let Err(err) = code { - if rpc.backup { - debug!("{} failed health check query! {:#?}", rpc, err); - } else { - warn!("{} failed health check query! {:#?}", rpc, err); - } - continue; - } - } - } - - old_total_requests = new_total_requests; + if new_total_requests - old_total_requests < 10 { + // TODO: if this fails too many times, reset the connection + // TODO: move this into a function and the chaining should be easier + if let Err(err) = rpc.healthcheck(&authorization, error_handler).await { + // TODO: different level depending on the error handler + warn!("health checking {} failed: {:?}", rpc, err); } } - debug!("health checks for {} exited", rpc); - Ok(()) - }; + // TODO: should we count the requests done inside this health check + old_total_requests = new_total_requests; - futures.push(flatten_handle(tokio::spawn(f))); - - // wait on the initial connection - ready_rx.await?; - } - - if let Some(block_sender) = &block_sender { - // TODO: do we need this to be abortable? - let f = self.clone().subscribe_new_heads( - authorization.clone(), - http_interval_receiver, - block_sender.clone(), - block_map.clone(), - ); - - futures.push(flatten_handle(tokio::spawn(f))); - } - - if let Some(tx_id_sender) = &tx_id_sender { - // TODO: do we need this to be abortable? - let f = self - .clone() - .subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone()); - - futures.push(flatten_handle(tokio::spawn(f))); - } - - match try_join_all(futures).await { - Ok(_) => { - // future exited without error - // TODO: think about this more. we never set it to false. this can't be right - break; + sleep(Duration::from_secs(health_sleep_seconds)).await; } - Err(err) => { - let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); - if self.reconnect.load(atomic::Ordering::Acquire) { - warn!("{} connection ended. reconnecting. err={:?}", self, err); + debug!("healthcheck loop on {} exited", rpc); - // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. - disconnect_sender.send_replace(true); - disconnect_sender.send_replace(false); + Ok(()) + }; - // we call retrying_connect here with initial_delay=true. above, initial_delay=false - delay_start = true; - - continue; - } - - // reconnect is not enabled. - if *disconnect_receiver.borrow() { - info!("{} is disconnecting", self); - break; - } else { - error!("{} subscription exited. err={:?}", self, err); - - disconnect_sender.send_replace(true); - - break; - } - } - } + futures.push(flatten_handle(tokio::spawn(f))); } + // subscribe to new heads + if let Some(block_sender) = &block_sender { + // TODO: do we need this to be abortable? + let f = self.clone().subscribe_new_heads( + authorization.clone(), + block_sender.clone(), + block_map.clone(), + ); - */ - info!("all subscriptions on {} completed", self); + futures.push(flatten_handle(tokio::spawn(f))); + } + + // subscribe pending transactions + // TODO: make this opt-in. its a lot of bandwidth + if let Some(tx_id_sender) = tx_id_sender { + // TODO: do we need this to be abortable? + let f = self + .clone() + .subscribe_pending_transactions(authorization.clone(), tx_id_sender); + + futures.push(flatten_handle(tokio::spawn(f))); + } + + // try_join on the futures + if let Err(err) = try_join_all(futures).await { + warn!("subscription erred: {:?}", err); + } + + debug!("subscriptions on {} exited", self); + + self.disconnect_watch + .as_ref() + .expect("disconnect_watch should always be set") + .send_replace(true); Ok(()) } @@ -736,197 +655,76 @@ impl Web3Rpc { async fn subscribe_new_heads( self: Arc, authorization: Arc, - http_interval_receiver: Option>, block_sender: flume::Sender, block_map: BlocksByHashCache, - ) -> anyhow::Result<()> { - trace!("watching new heads on {}", self); + ) -> Web3ProxyResult<()> { + debug!("subscribing to new heads on {}", self); if let Some(ws_provider) = self.ws_provider.as_ref() { - todo!("subscribe") + // todo: move subscribe_blocks onto the request handle + let active_request_handle = self.wait_for_request_handle(&authorization, None).await; + let mut blocks = ws_provider.subscribe_blocks().await?; + drop(active_request_handle); + + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // but all seeing the same block twice won't break anything + // TODO: how does this get wrapped in an arc? does ethers handle that? + // TODO: can we force this to use the websocket? + let latest_block: Result, _> = self + .request( + "eth_getBlockByNumber", + &json!(("latest", false)), + Level::Warn.into(), + authorization, + ) + .await; + + self.send_head_block_result(latest_block, &block_sender, &block_map) + .await?; + + while let Some(block) = blocks.next().await { + if self.should_disconnect() { + break; + } + + let block = Arc::new(block); + + self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) + .await?; + } } else if let Some(http_provider) = self.http_provider.as_ref() { - todo!("poll") + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + let mut blocks = http_provider.watch_blocks().await?; + + while let Some(block_hash) = blocks.next().await { + if self.should_disconnect() { + break; + } + + let block = if let Some(block) = block_map.get(&block_hash) { + block.block + } else if let Some(block) = http_provider.get_block(block_hash).await? { + Arc::new(block) + } else { + continue; + }; + + self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) + .await?; + } } else { unimplemented!("no ws or http provider!") } - /* - match provider.as_ref() { - Web3Provider::Http(_client) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: try watch_blocks and fall back to this? - - let mut http_interval_receiver = http_interval_receiver.unwrap(); - - let mut last_hash = H256::zero(); - - while !self.should_disconnect() { - // TODO: what should the max_wait be? - // we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one - match self.wait_for_request_handle(&authorization, None).await { - Ok(active_request_handle) => { - let block: Result, _> = active_request_handle - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), - ) - .await; - - match block { - Ok(None) => { - warn!("no head block on {}", self); - - self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) - .await?; - } - Ok(Some(block)) => { - if let Some(new_hash) = block.hash { - // don't send repeat blocks - if new_hash != last_hash { - // new hash! - last_hash = new_hash; - - self.send_head_block_result( - Ok(Some(block)), - &block_sender, - block_map.clone(), - ) - .await?; - } - } else { - // TODO: why is this happening? - warn!("empty head block on {}", self); - - self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) - .await?; - } - } - Err(err) => { - // we did not get a block back. something is up with the server. take it out of rotation - self.send_head_block_result( - Err(err), - &block_sender, - block_map.clone(), - ) - .await?; - } - } - } - Err(err) => { - warn!("Internal error on latest block from {}. {:?}", self, err); - - self.send_head_block_result(Ok(None), &block_sender, block_map.clone()) - .await?; - - // TODO: what should we do? sleep? extra time? - } - } - - // wait for the next interval - // TODO: if error or rate limit, increase interval? - while let Err(err) = http_interval_receiver.recv().await { - match err { - broadcast::error::RecvError::Closed => { - // channel is closed! that's not good. bubble the error up - return Err(err.into()); - } - broadcast::error::RecvError::Lagged(lagged) => { - // querying the block was delayed - // this can happen if tokio is very busy or waiting for requests limits took too long - if self.backup { - debug!("http interval on {} lagging by {}!", self, lagged); - } else { - warn!("http interval on {} lagging by {}!", self, lagged); - } - } - } - } - } - } - Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { - // todo: move subscribe_blocks onto the request handle? - let active_request_handle = - self.wait_for_request_handle(&authorization, None).await; - let mut stream = client.subscribe_blocks().await?; - drop(active_request_handle); - - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // but all that does is print "new block" for the same block as current block - // TODO: how does this get wrapped in an arc? does ethers handle that? - // TODO: do this part over http? - let block: Result, _> = self - .wait_for_request_handle(&authorization, None) - .await? - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), - ) - .await; - - let mut last_hash = match &block { - Ok(Some(new_block)) => new_block - .hash - .expect("blocks should always have a hash here"), - _ => H256::zero(), - }; - - self.send_head_block_result(block, &block_sender, block_map.clone()) - .await?; - - while let Some(new_block) = stream.next().await { - // TODO: select on disconnect_watch instead of waiting for a block to arrive - if self.should_disconnect() { - break; - } - - // TODO: check the new block's hash to be sure we don't send dupes - let new_hash = new_block - .hash - .expect("blocks should always have a hash here"); - - if new_hash == last_hash { - // some rpcs like to give us duplicates. don't waste our time on them - continue; - } else { - last_hash = new_hash; - } - - self.send_head_block_result( - Ok(Some(Arc::new(new_block))), - &block_sender, - block_map.clone(), - ) - .await?; - } - - // TODO: is this always an error? - // TODO: we probably don't want a warn and to return error - debug!("new_heads subscription to {} ended", self); - } - #[cfg(test)] - Web3Provider::Mock => unimplemented!(), - } - */ - // clear the head block. this might not be needed, but it won't hurt - self.send_head_block_result(Ok(None), &block_sender, block_map) + self.send_head_block_result(Ok(None), &block_sender, &block_map) .await?; if self.should_disconnect() { Ok(()) } else { - Err(anyhow!("new_heads subscription exited. reconnect needed")) + Err(anyhow!("new_heads subscription exited. reconnect needed").into()) } } @@ -935,7 +733,7 @@ impl Web3Rpc { self: Arc, authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // TODO: make this subscription optional self.wait_for_disconnect().await?; @@ -985,9 +783,7 @@ impl Web3Rpc { if self.should_disconnect() { Ok(()) } else { - Err(anyhow!( - "pending_transactions subscription exited. reconnect needed" - )) + Err(anyhow!("pending_transactions subscription exited. reconnect needed").into()) } } @@ -1034,7 +830,7 @@ impl Web3Rpc { } // TODO: sleep how long? maybe just error? - // TODO: instead of an arbitrary sleep, subscribe to the head block on this + // TODO: instead of an arbitrary sleep, subscribe to the head block on this? sleep(Duration::from_millis(10)).await; } Err(err) => return Err(err), @@ -1099,36 +895,39 @@ impl Web3Rpc { } async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> { - let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe(); + let mut disconnect_subscription = self.disconnect_watch.as_ref().unwrap().subscribe(); loop { - if *disconnect_watch.borrow_and_update() { + if *disconnect_subscription.borrow_and_update() { // disconnect watch is set to "true" return Ok(()); } - // wait for disconnect_watch to change - disconnect_watch.changed().await?; + // wait for disconnect_subscription to change + disconnect_subscription.changed().await?; } } - pub async fn wait_for_query( + pub async fn request( self: &Arc, method: &str, params: &P, revert_handler: RequestErrorHandler, authorization: Arc, - ) -> anyhow::Result + ) -> Web3ProxyResult where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send, { - self.wait_for_request_handle(&authorization, None) + // TODO: take max_wait as a function argument? + let x = self + .wait_for_request_handle(&authorization, None) .await? .request::(method, params, revert_handler) - .await - .context("ProviderError from the backend") + .await?; + + Ok(x) } } @@ -1255,7 +1054,7 @@ impl fmt::Display for Web3Rpc { mod tests { #![allow(unused_imports)] use super::*; - use ethers::types::{Block, U256}; + use ethers::types::{Block, H256, U256}; #[test] fn test_archive_node_has_block_data() { diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index 1829b0a5..45c82147 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,26 +1,20 @@ -use anyhow::anyhow; -use derive_more::From; use ethers::providers::{Authorization, ConnectionDetails}; -use std::{borrow::Cow, time::Duration}; +use std::time::Duration; use url::Url; // TODO: our own structs for these that handle streaming large responses pub type EthersHttpProvider = ethers::providers::Provider; pub type EthersWsProvider = ethers::providers::Provider; -pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option { +pub fn extract_auth(url: &mut Url) -> Option { if let Some(pass) = url.password().map(|x| x.to_string()) { // to_string is needed because we are going to remove these items from the url let user = url.username().to_string(); // clear username and password from the url - let mut_url = url.to_mut(); - - mut_url - .set_username("") + url.set_username("") .expect("unable to clear username on websocket"); - mut_url - .set_password(None) + url.set_password(None) .expect("unable to clear password on websocket"); // keep them @@ -33,35 +27,36 @@ pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option { /// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used /// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set pub fn connect_http( - mut url: Cow<'_, Url>, + mut url: Url, http_client: Option, + interval: Duration, ) -> anyhow::Result { let auth = extract_auth(&mut url); - let provider = if url.scheme().starts_with("http") { + let mut provider = if url.scheme().starts_with("http") { let provider = if let Some(auth) = auth { - ethers::providers::Http::new_with_auth(url.into_owned(), auth)? + ethers::providers::Http::new_with_auth(url, auth)? } else if let Some(http_client) = http_client { - ethers::providers::Http::new_with_client(url.into_owned(), http_client) + ethers::providers::Http::new_with_client(url, http_client) } else { - ethers::providers::Http::new(url.into_owned()) + ethers::providers::Http::new(url) }; // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(12)) - .into() + ethers::providers::Provider::new(provider).interval(Duration::from_secs(2)) } else { - return Err(anyhow::anyhow!("only http servers are supported")); + return Err(anyhow::anyhow!( + "only http servers are supported. cannot use {}", + url + )); }; + provider.set_interval(interval); + Ok(provider) } -pub async fn connect_ws( - mut url: Cow<'_, Url>, - reconnects: usize, -) -> anyhow::Result { +pub async fn connect_ws(mut url: Url, reconnects: usize) -> anyhow::Result { let auth = extract_auth(&mut url); let provider = if url.scheme().starts_with("ws") { @@ -76,7 +71,7 @@ pub async fn connect_ws( // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: i don't think this interval matters - ethers::providers::Provider::new(provider).into() + ethers::providers::Provider::new(provider) } else { return Err(anyhow::anyhow!("ws servers are supported")); }; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 34110bf7..9e91f75c 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,5 +1,6 @@ use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::Web3ProxyResult; use anyhow::Context; use chrono::Utc; use entities::revert_log; @@ -13,7 +14,7 @@ use std::fmt; use std::sync::atomic; use std::sync::Arc; use thread_fast_rng::rand::Rng; -use tokio::time::{sleep, Duration, Instant}; +use tokio::time::{Duration, Instant}; #[derive(Debug)] pub enum OpenRequestResult { @@ -75,7 +76,7 @@ impl Authorization { self: Arc, method: Method, params: EthCallFirstParams, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let rpc_key_id = match self.checks.rpc_secret_key_id { Some(rpc_key_id) => rpc_key_id.into(), None => { diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index c46da986..c9602e6c 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -1,4 +1,4 @@ -use crate::frontend::authorization::Authorization; +use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult}; use super::many::Web3Rpcs; ///! Load balanced communication with a group of web3 providers @@ -70,7 +70,7 @@ impl Web3Rpcs { rpc: Arc, pending_tx_id: TxHash, pending_tx_sender: broadcast::Sender, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout // TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory // TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it? diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index deb8563d..0aedb115 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -8,9 +8,9 @@ pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; use crate::app::RpcSecretKeyCache; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::Web3ProxyError; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::one::Web3Rpc; -use anyhow::Context; +use anyhow::{anyhow, Context}; use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; @@ -229,13 +229,14 @@ impl BufferedRpcQueryStats { db_conn: &DatabaseConnection, key: RpcQueryKey, rpc_secret_key_cache: Option<&RpcSecretKeyCache>, - ) -> anyhow::Result<()> { - anyhow::ensure!( - key.response_timestamp > 0, - "no response_timestamp! This is a bug! {:?} {:?}", - key, - self - ); + ) -> Web3ProxyResult<()> { + if key.response_timestamp == 0 { + return Err(Web3ProxyError::Anyhow(anyhow!( + "no response_timestamp! This is a bug! {:?} {:?}", + key, + self + ))); + } let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); @@ -670,8 +671,9 @@ impl RpcQueryStats { method: Option<&str>, ) -> Decimal { // for now, always return 0 for cost - return 0.into(); + 0.into() + /* // some methods should be free. there might be cases where method isn't set (though they should be uncommon) // TODO: get this list from config (and add more to it) if let Some(method) = method.as_ref() { @@ -704,5 +706,6 @@ impl RpcQueryStats { } cost + */ } } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index f8fd2db8..c7028204 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,5 +1,6 @@ use super::{AppStat, RpcQueryKey}; -use crate::app::RpcSecretKeyCache; +use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle}; +use crate::frontend::errors::Web3ProxyResult; use derive_more::From; use futures::stream; use hashbrown::HashMap; @@ -9,7 +10,6 @@ use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; use tokio::sync::broadcast; -use tokio::task::JoinHandle; use tokio::time::interval; #[derive(Debug, Default)] @@ -32,7 +32,7 @@ pub struct BufferedRpcQueryStats { pub struct SpawnedStatBuffer { pub stat_sender: flume::Sender, /// these handles are important and must be allowed to finish - pub background_handle: JoinHandle>, + pub background_handle: Web3ProxyJoinHandle<()>, } pub struct StatBuffer { accounting_db_buffer: HashMap, @@ -96,7 +96,7 @@ impl StatBuffer { bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval =