diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index e8844be2..2dd59eaf 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -41,7 +41,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::Web3Connections; +use crate::rpcs::connections::Web3Connections; use crate::stats::AppStats; // TODO: make this customizable? diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5354e39b..b22254d2 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::sync::broadcast; use crate::app::AnyhowJoinHandle; -use crate::rpcs::Web3Connection; +use crate::rpcs::connection::Web3Connection; pub type BlockAndRpc = (Arc>, Arc); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index c59621f7..b9d3cf84 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -1,7 +1,7 @@ ///! Keep track of the blockchain as seen by a Web3Connections. -use super::SyncedConnections; -use super::Web3Connection; -use super::Web3Connections; +use super::connection::Web3Connection; +use super::connections::Web3Connections; +use super::synced_connections::SyncedConnections; use crate::app::TxState; use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index e6a41ef1..19b07ac7 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,5 +1,7 @@ -///! Rate-limited communication with a web3 provider +///! Rate-limited communication with a web3 provider. use super::provider::Web3Provider; +use super::request::PendingRequestHandle; +use super::request::RequestHandleResult; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; @@ -19,23 +21,16 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; -// TODO: rename this -pub enum HandleResult { - ActiveRequest(ActiveRequestHandle), - RetryAt(Instant), - None, -} - /// An active connection to a Web3Rpc pub struct Web3Connection { - name: String, + pub name: String, /// TODO: can we get this from the provider? do we even need it? pub url: String, /// keep track of currently open requests. We sort on this - active_requests: AtomicU32, + pub(super) active_requests: AtomicU32, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits - provider: AsyncRwLock>>, + pub(super) provider: AsyncRwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits hard_limit: Option, /// used for load balancing to the least loaded server @@ -45,47 +40,6 @@ pub struct Web3Connection { head_block: RwLock<(H256, U64)>, } -/// Drop this once a connection completes -pub struct ActiveRequestHandle(Arc); - -impl Web3Provider { - #[instrument] - async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { - let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.parse()?; - - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; - - let provider = ethers::providers::Http::new_with_client(url, http_client); - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(13)) - .into() - } else if url_str.starts_with("ws") { - let provider = ethers::providers::Ws::connect(url_str) - .instrument(info_span!("Web3Provider", url_str = url_str)) - .await?; - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - // TODO: i don't think this interval matters - ethers::providers::Provider::new(provider).into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - Ok(provider) - } -} - -impl fmt::Debug for Web3Provider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url - f.debug_struct("Web3Provider").finish_non_exhaustive() - } -} - impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] @@ -418,7 +372,7 @@ impl Web3Connection { loop { match self.try_request_handle().await { - Ok(HandleResult::ActiveRequest(active_request_handle)) => { + Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; @@ -440,12 +394,12 @@ impl Web3Connection { self.clone().send_block_result(block, &block_sender).await?; } } - Ok(HandleResult::RetryAt(retry_at)) => { + Ok(RequestHandleResult::RetryAt(retry_at)) => { warn!(?retry_at, "Rate limited on latest block from {}", self); sleep_until(retry_at).await; continue; } - Ok(HandleResult::None) => { + Ok(RequestHandleResult::None) => { warn!("No handle for latest block from {}", self); // TODO: what should we do? } @@ -567,17 +521,17 @@ impl Web3Connection { /// be careful with this; it will wait forever! #[instrument(skip_all)] - pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { + pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { match self.try_request_handle().await { - Ok(HandleResult::ActiveRequest(handle)) => return Ok(handle), - Ok(HandleResult::RetryAt(retry_at)) => { + Ok(RequestHandleResult::ActiveRequest(handle)) => return Ok(handle), + Ok(RequestHandleResult::RetryAt(retry_at)) => { // TODO: emit a stat? sleep_until(retry_at).await; } - Ok(HandleResult::None) => { + Ok(RequestHandleResult::None) => { // TODO: when can this happen? emit a stat? // TODO: instead of erroring, subscribe to the head block on this // TODO: sleep how long? maybe just error? @@ -589,11 +543,11 @@ impl Web3Connection { } } - pub async fn try_request_handle(self: &Arc) -> anyhow::Result { + pub async fn try_request_handle(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? - return Ok(HandleResult::None); + return Ok(RequestHandleResult::None); } // check rate limits @@ -609,7 +563,7 @@ impl Web3Connection { // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? warn!(?retry_at, ?self, "Exhausted rate limit"); - return Ok(HandleResult::RetryAt(retry_at.into())); + return Ok(RequestHandleResult::RetryAt(retry_at.into())); } Ok(ThrottleResult::RetryNever) => { return Err(anyhow::anyhow!("Rate limit failed.")); @@ -620,9 +574,16 @@ impl Web3Connection { } }; - let handle = ActiveRequestHandle::new(self.clone()); + let handle = PendingRequestHandle::new(self.clone()); - Ok(HandleResult::ActiveRequest(handle)) + Ok(RequestHandleResult::ActiveRequest(handle)) + } +} + +impl fmt::Debug for Web3Provider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url + f.debug_struct("Web3Provider").finish_non_exhaustive() } } @@ -633,71 +594,6 @@ impl Hash for Web3Connection { } } -impl ActiveRequestHandle { - fn new(connection: Arc) -> Self { - // TODO: attach a unique id to this? - // TODO: what ordering?! - connection - .active_requests - .fetch_add(1, atomic::Ordering::AcqRel); - - Self(connection) - } - - pub fn clone_connection(&self) -> Arc { - self.0.clone() - } - - /// Send a web3 request - /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// By taking self here, we ensure that this is dropped after the request is complete - #[instrument(skip_all)] - pub async fn request( - &self, - method: &str, - params: T, - ) -> Result - where - T: fmt::Debug + serde::Serialize + Send + Sync, - R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, - { - // TODO: use tracing spans properly - // TODO: it would be nice to have the request id on this - // TODO: including params in this is way too verbose - trace!("Sending {} to {}", method, self.0); - - let mut provider = None; - - while provider.is_none() { - // TODO: if no provider, don't unwrap. wait until there is one. - match self.0.provider.read().await.as_ref() { - None => {} - Some(found_provider) => provider = Some(found_provider.clone()), - } - } - - let response = match &*provider.unwrap() { - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, - }; - - // TODO: i think ethers already has trace logging (and does it much more fancy) - // TODO: at least instrument this with more useful information - // trace!("Reply from {}: {:?}", self.0, response); - trace!("Reply from {}", self.0); - - response - } -} - -impl Drop for ActiveRequestHandle { - fn drop(&mut self) { - self.0 - .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); - } -} - impl Eq for Web3Connection {} impl Ord for Web3Connection { diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 4e3d3953..4ca29c78 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,6 +1,7 @@ ///! Load balanced communication with a group of web3 providers -use super::SyncedConnections; -use super::{ActiveRequestHandle, HandleResult, Web3Connection}; +use super::connection::Web3Connection; +use super::request::{PendingRequestHandle, RequestHandleResult}; +use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -189,7 +190,7 @@ impl Web3Connections { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: maximum wait time let pending_transaction: Transaction = match rpc.try_request_handle().await { - Ok(HandleResult::ActiveRequest(handle)) => { + Ok(RequestHandleResult::ActiveRequest(handle)) => { handle .request("eth_getTransactionByHash", (pending_tx_id,)) .await? @@ -348,7 +349,7 @@ impl Web3Connections { #[instrument(skip_all)] pub async fn try_send_parallel_requests( &self, - active_request_handles: Vec, + active_request_handles: Vec, method: &str, // TODO: remove this box once i figure out how to do the options params: Option<&serde_json::Value>, @@ -436,7 +437,7 @@ impl Web3Connections { &self, skip: &[Arc], min_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> anyhow::Result { let mut earliest_retry_at = None; // filter the synced rpcs @@ -493,14 +494,14 @@ impl Web3Connections { for rpc in synced_rpcs.into_iter() { // increment our connection counter match rpc.try_request_handle().await { - Ok(HandleResult::ActiveRequest(handle)) => { + Ok(RequestHandleResult::ActiveRequest(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); - return Ok(HandleResult::ActiveRequest(handle)); + return Ok(RequestHandleResult::ActiveRequest(handle)); } - Ok(HandleResult::RetryAt(retry_at)) => { + Ok(RequestHandleResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(HandleResult::None) => { + Ok(RequestHandleResult::None) => { // TODO: log a warning? } Err(err) => { @@ -514,7 +515,7 @@ impl Web3Connections { match earliest_retry_at { None => todo!(), - Some(earliest_retry_at) => Ok(HandleResult::RetryAt(earliest_retry_at)), + Some(earliest_retry_at) => Ok(RequestHandleResult::RetryAt(earliest_retry_at)), } } @@ -524,7 +525,7 @@ impl Web3Connections { pub async fn upstream_servers( &self, min_block_needed: Option<&U64>, - ) -> Result, Option> { + ) -> Result, Option> { let mut earliest_retry_at = None; // TODO: with capacity? let mut selected_rpcs = vec![]; @@ -538,12 +539,12 @@ impl Web3Connections { // check rate limits and increment our connection counter match connection.try_request_handle().await { - Ok(HandleResult::RetryAt(retry_at)) => { + Ok(RequestHandleResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(HandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle), - Ok(HandleResult::None) => { + Ok(RequestHandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle), + Ok(RequestHandleResult::None) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -577,7 +578,7 @@ impl Web3Connections { .next_upstream_server(&skip_rpcs, min_block_needed) .await { - Ok(HandleResult::ActiveRequest(active_request_handle)) => { + Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); @@ -630,7 +631,7 @@ impl Web3Connections { } } } - Ok(HandleResult::RetryAt(retry_at)) => { + Ok(RequestHandleResult::RetryAt(retry_at)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting @@ -640,7 +641,7 @@ impl Web3Connections { continue; } - Ok(HandleResult::None) => { + Ok(RequestHandleResult::None) => { warn!(?self, "No server handles!"); // TODO: subscribe to something on synced connections. maybe it should just be a watch channel diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index ff2ae8c7..c283a179 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -1,9 +1,6 @@ -mod blockchain; -mod connection; -mod connections; -mod provider; -mod synced_connections; - -pub use connection::{ActiveRequestHandle, HandleResult, Web3Connection}; -pub use connections::Web3Connections; -pub use synced_connections::SyncedConnections; +pub mod blockchain; +pub mod connection; +pub mod connections; +pub mod provider; +pub mod request; +pub mod synced_connections; diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index c65e1a13..5b4ba168 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use derive_more::From; +use tracing::{info_span, instrument, Instrument}; /// Use HTTP and WS providers. // TODO: instead of an enum, I tried to use Box, but hit @@ -7,3 +10,37 @@ pub enum Web3Provider { Http(ethers::providers::Provider), Ws(ethers::providers::Provider), } + +impl Web3Provider { + #[instrument] + pub async fn from_str( + url_str: &str, + http_client: Option, + ) -> anyhow::Result { + let provider = if url_str.starts_with("http") { + let url: url::Url = url_str.parse()?; + + let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + + let provider = ethers::providers::Http::new_with_client(url, http_client); + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(13)) + .into() + } else if url_str.starts_with("ws") { + let provider = ethers::providers::Ws::connect(url_str) + .instrument(info_span!("Web3Provider", %url_str)) + .await?; + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + // TODO: i don't think this interval matters + ethers::providers::Provider::new(provider).into() + } else { + return Err(anyhow::anyhow!("only http and ws servers are supported")); + }; + + Ok(provider) + } +} diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs new file mode 100644 index 00000000..4fde6ded --- /dev/null +++ b/web3_proxy/src/rpcs/request.rs @@ -0,0 +1,95 @@ +use super::connection::Web3Connection; +use super::provider::Web3Provider; +use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::config::BlockAndRpc; +use anyhow::Context; +use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; +use futures::future::try_join_all; +use futures::StreamExt; +use parking_lot::RwLock; +use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult}; +use serde::ser::{SerializeStruct, Serializer}; +use serde::Serialize; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::atomic::{self, AtomicU32, AtomicU64}; +use std::{cmp::Ordering, sync::Arc}; +use tokio::sync::broadcast; +use tokio::sync::RwLock as AsyncRwLock; +use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; +use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; + +// TODO: rename this +pub enum RequestHandleResult { + ActiveRequest(PendingRequestHandle), + RetryAt(Instant), + None, +} + +/// Drop this once a connection completes +pub struct PendingRequestHandle(Arc); + +impl PendingRequestHandle { + pub fn new(connection: Arc) -> Self { + // TODO: attach a unique id to this? + // TODO: what ordering?! + connection + .active_requests + .fetch_add(1, atomic::Ordering::AcqRel); + + Self(connection) + } + + pub fn clone_connection(&self) -> Arc { + self.0.clone() + } + + /// Send a web3 request + /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented + /// By taking self here, we ensure that this is dropped after the request is complete + #[instrument(skip_all)] + pub async fn request( + &self, + method: &str, + params: T, + ) -> Result + where + T: fmt::Debug + serde::Serialize + Send + Sync, + R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, + { + // TODO: use tracing spans properly + // TODO: it would be nice to have the request id on this + // TODO: including params in this is way too verbose + trace!("Sending {} to {}", method, self.0); + + let mut provider = None; + + while provider.is_none() { + // TODO: if no provider, don't unwrap. wait until there is one. + match self.0.provider.read().await.as_ref() { + None => {} + Some(found_provider) => provider = Some(found_provider.clone()), + } + } + + let response = match &*provider.unwrap() { + Web3Provider::Http(provider) => provider.request(method, params).await, + Web3Provider::Ws(provider) => provider.request(method, params).await, + }; + + // TODO: i think ethers already has trace logging (and does it much more fancy) + // TODO: at least instrument this with more useful information + // trace!("Reply from {}: {:?}", self.0, response); + trace!("Reply from {}", self.0); + + response + } +} + +impl Drop for PendingRequestHandle { + fn drop(&mut self) { + self.0 + .active_requests + .fetch_sub(1, atomic::Ordering::AcqRel); + } +} diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 5245f0d8..311960c6 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,4 +1,5 @@ -use super::{Web3Connection, Web3Connections}; +use super::connection::Web3Connection; +use super::connections::Web3Connections; use ethers::prelude::{H256, U64}; use indexmap::IndexSet; use serde::Serialize;