///! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlockHashesCache, BlockId}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use crate::frontend::authorization::AuthorizedRequest; use anyhow::Context; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use std::cmp::min; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, instrument, trace, warn, Level}; /// An active connection to a Web3 RPC server like geth or erigon. pub struct Web3Connection { pub name: String, /// TODO: can we get this from the provider? do we even need it? url: String, /// Some connections use an http_client. we keep a clone for reconnecting http_client: Option, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, /// keep track of total requests /// TODO: is this type okay? /// TODO: replace this with something in metered? pub(super) total_requests: AtomicU64, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits pub(super) provider: AsyncRwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// We do not use the deferred rate limiter because going over limits would cause errors hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, /// TODO: have an enum for this so that "no limit" prints pretty? block_data_limit: AtomicU64, /// Lower weight are higher priority when sending requests pub(super) weight: u32, // TODO: async lock? pub(super) head_block_id: RwLock>, pub(super) open_request_handle_metrics: Arc, pub(super) db_conn: Option, } impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] // TODO: have this take a builder (which will have channels attached) #[allow(clippy::too_many_arguments)] pub async fn spawn( name: String, chain_id: u64, url_str: String, // optional because this is only used for http providers. websocket providers don't use it http_client: Option, http_interval_sender: Option>>, // TODO: have a builder struct for this. hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, weight: u32, open_request_handle_metrics: Arc, db_conn: Option, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { // TODO: is cache size 1 okay? i think we need RedisRateLimiter::new( "web3_proxy", &format!("{}:{}", chain_id, url_str), hard_rate_limit, 60.0, redis_pool, ) }); let new_connection = Self { name, http_client, url: url_str.clone(), active_requests: 0.into(), total_requests: 0.into(), provider: AsyncRwLock::new(None), hard_limit, soft_limit, block_data_limit: Default::default(), head_block_id: RwLock::new(Default::default()), weight, open_request_handle_metrics, db_conn, }; let new_connection = Arc::new(new_connection); // connect to the server (with retries) new_connection .retrying_reconnect(block_sender.as_ref(), false) .await?; // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: what should the timeout be? let found_chain_id: Result = new_connection .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request( "eth_chainId", &json!(Option::None::<()>), Level::ERROR.into(), ) .await; match found_chain_id { Ok(found_chain_id) => { // TODO: there has to be a cleaner way to do this if chain_id != found_chain_id.as_u64() { return Err(anyhow::anyhow!( "incorrect chain id! Expected {}. Found {}", chain_id, found_chain_id ) .context(format!("failed @ {}", new_connection))); } } Err(e) => { let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection)); return Err(e); } } let will_subscribe_to_blocks = block_sender.is_some(); // subscribe to new blocks and new transactions // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { let new_connection = new_connection.clone(); tokio::spawn(async move { new_connection .subscribe( http_interval_sender, block_map, block_sender, tx_id_sender, reconnect, ) .await }) }; // we could take "archive" as a parameter, but we would want a safety check on it regardless // check common archive thresholds // TODO: would be great if rpcs exposed this // TODO: move this to a helper function so we can recheck on errors or as the chain grows // TODO: move this to a helper function that checks if will_subscribe_to_blocks { // TODO: make sure the server isn't still syncing // TODO: don't sleep. wait for new heads subscription instead // TODO: i think instead of atomics, we could maybe use a watch channel sleep(Duration::from_millis(250)).await; new_connection.check_block_data_limit().await?; } Ok((new_connection, handle)) } async fn check_block_data_limit(self: &Arc) -> anyhow::Result> { let mut limit = None; for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { let mut head_block_id = self.head_block_id.read().clone(); // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? while head_block_id.is_none() { warn!(rpc=%self, "no head block yet. retrying"); sleep(Duration::from_secs(13)).await; head_block_id = self.head_block_id.read().clone(); } let head_block_num = head_block_id.expect("is_none was checked above").num; debug_assert_ne!(head_block_num, U64::zero()); // TODO: subtract 1 from block_data_limit for safety? let maybe_archive_block = head_block_num .saturating_sub((block_data_limit).into()) .max(U64::one()); // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? let archive_result: Result = self .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request( "eth_getCode", &json!(( "0xdead00000000000000000000000000000000beef", maybe_archive_block, )), // error here are expected, so keep the level low tracing::Level::DEBUG.into(), ) .await; trace!(?archive_result, rpc=%self); if archive_result.is_ok() { limit = Some(block_data_limit); break; } } if let Some(limit) = limit { self.block_data_limit .store(limit, atomic::Ordering::Release); } Ok(limit) } /// TODO: this might be too simple. different nodes can prune differently pub fn block_data_limit(&self) -> U64 { self.block_data_limit.load(atomic::Ordering::Acquire).into() } pub fn has_block_data(&self, needed_block_num: &U64) -> bool { let block_data_limit: U64 = self.block_data_limit(); let head_block_id = self.head_block_id.read().clone(); let newest_block_num = match head_block_id { None => return false, Some(x) => x.num, }; let oldest_block_num = newest_block_num .saturating_sub(block_data_limit) .max(U64::one()); needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num } /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. /// We use the "Decorrelated" jitter from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. pub async fn retrying_reconnect( self: &Arc, block_sender: Option<&flume::Sender>, initial_sleep: bool, ) -> anyhow::Result<()> { // there are several crates that have retry helpers, but they all seem more complex than necessary let base_ms = 500; let cap_ms = 30_000; let range_multiplier = 3; // sleep once before the initial retry attempt // TODO: now that we use this method for our initial connection, do we still want this sleep? let mut sleep_ms = if initial_sleep { let first_sleep_ms = min( cap_ms, rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)), ); let reconnect_in = Duration::from_millis(first_sleep_ms); warn!(rpc=%self, ?reconnect_in, "Reconnect in"); sleep(reconnect_in).await; first_sleep_ms } else { base_ms }; // retry until we succeed while let Err(err) = self.reconnect(block_sender).await { sleep_ms = min( cap_ms, rand::thread_rng().gen_range(base_ms..(sleep_ms * range_multiplier)), ); let retry_in = Duration::from_millis(sleep_ms); warn!(rpc=%self, ?retry_in, ?err, "Failed to reconnect!"); sleep(retry_in).await; } Ok(()) } /// reconnect a websocket provider pub async fn reconnect( self: &Arc, // websocket doesn't need the http client block_sender: Option<&flume::Sender>, ) -> anyhow::Result<()> { // since this lock is held open over an await, we use tokio's locking // TODO: timeout on this lock. if its slow, something is wrong let mut provider_option = self.provider.write().await; if let Some(provider) = &*provider_option { match provider.as_ref() { Web3Provider::Http(_) => { // http clients don't need to do anything for reconnecting // they *do* need to run this function to setup the first return Ok(()); } Web3Provider::Ws(_) => {} } info!(rpc=%self, "reconnecting"); // disconnect the current provider *provider_option = None; // reset sync status { let mut head_block_id = self.head_block_id.write(); *head_block_id = None; } // tell the block subscriber that we don't have any blocks if let Some(block_sender) = &block_sender { block_sender .send_async((None, self.clone())) .await .context("block_sender during connect")?; } } else { info!(rpc=%self, "connecting"); } // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; *provider_option = Some(Arc::new(new_provider)); info!(rpc=%self, "successfully connected"); Ok(()) } #[inline] pub fn active_requests(&self) -> u32 { self.active_requests.load(atomic::Ordering::Acquire) } #[inline] pub async fn has_provider(&self) -> bool { self.provider.read().await.is_some() } async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, block_sender: &flume::Sender, block_map: BlockHashesCache, ) -> anyhow::Result<()> { match new_head_block { Ok(None) => { // TODO: i think this should clear the local block and then update over the block sender todo!("handle no block") } Ok(Some(mut new_head_block)) => { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); // if we already have this block saved, set new_head_block to that arc. otherwise store this copy new_head_block = block_map .get_with(new_hash, async move { new_head_block }) .await; let new_num = new_head_block.number.unwrap_or_default(); // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query { let mut head_block_id = self.head_block_id.write(); if head_block_id.is_none() { *head_block_id = Some(BlockId { hash: new_hash, num: new_num, }); } else { head_block_id.as_mut().map(|x| { x.hash = new_hash; x.num = new_num; x }); } } block_sender .send_async((Some(new_head_block), self.clone())) .await .context("block_sender")?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); // TODO: do something to rpc_chain? // send an empty block to take this server out of rotation block_sender .send_async((None, self.clone())) .await .context("block_sender")?; } } Ok(()) } /// subscribe to blocks and transactions with automatic reconnects async fn subscribe( self: Arc, http_interval_sender: Option>>, block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<()> { loop { debug!(rpc=%self, "subscribing"); let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); let mut futures = vec![]; if let Some(block_sender) = &block_sender { let f = self.clone().subscribe_new_heads( http_interval_receiver, block_sender.clone(), block_map.clone(), ); futures.push(flatten_handle(tokio::spawn(f))); } if let Some(tx_id_sender) = &tx_id_sender { let f = self .clone() .subscribe_pending_transactions(tx_id_sender.clone()); futures.push(flatten_handle(tokio::spawn(f))); } { // TODO: move this into a proper function let conn = self.clone(); // health check let f = async move { loop { if let Some(provider) = conn.provider.read().await.as_ref() { if provider.ready() { trace!(rpc=%conn, "provider is ready"); } else { warn!(rpc=%conn, "provider is NOT ready"); return Err(anyhow::anyhow!("provider is not ready")); } } // TODO: how often? // TODO: should we also check that the head block has changed recently? // TODO: maybe instead we should do a simple subscription and follow that instead sleep(Duration::from_secs(10)).await; } }; futures.push(flatten_handle(tokio::spawn(f))); } match try_join_all(futures).await { Ok(_) => { // futures all exited without error. break instead of restarting subscriptions break; } Err(err) => { if reconnect { warn!( rpc=%self, ?err, "subscription exited", ); self.retrying_reconnect(block_sender.as_ref(), true).await?; } else { error!(rpc=%self, ?err, "subscription exited"); return Err(err); } } } } info!(rpc=%self, "all subscriptions complete"); Ok(()) } /// Subscribe to new blocks. If `reconnect` is true, this runs forever. async fn subscribe_new_heads( self: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, block_map: BlockHashesCache, ) -> anyhow::Result<()> { info!(%self, "watching new heads"); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { match &*provider { Web3Provider::Http(_provider) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: try watch_blocks and fall back to this? let mut http_interval_receiver = http_interval_receiver.unwrap(); let mut last_hash = H256::zero(); loop { // TODO: what should the max_wait be? match self .wait_for_request_handle(None, Duration::from_secs(30)) .await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle .request( "eth_getBlockByNumber", &json!(("latest", false)), tracing::Level::ERROR.into(), ) .await; match block { Ok(block) => { // don't send repeat blocks let new_hash = block .hash .expect("blocks here should always have hashes"); if new_hash != last_hash { // new hash! last_hash = new_hash; self.send_head_block_result( Ok(Some(Arc::new(block))), &block_sender, block_map.clone(), ) .await?; } } Err(err) => { // we did not get a block back. something is up with the server. take it out of rotation self.send_head_block_result( Err(err), &block_sender, block_map.clone(), ) .await?; } } } Err(err) => { warn!(?err, "Internal error on latest block from {}", self); // TODO: what should we do? sleep? extra time? } } // wait for the next interval // TODO: if error or rate limit, increase interval? while let Err(err) = http_interval_receiver.recv().await { match err { broadcast::error::RecvError::Closed => { // channel is closed! that's not good. bubble the error up return Err(err.into()); } broadcast::error::RecvError::Lagged(lagged) => { // querying the block was delayed // this can happen if tokio is very busy or waiting for requests limits took too long warn!(?err, rpc=%self, "http interval lagging by {}!", lagged); } } } trace!(rpc=%self, "ok http interval"); } } Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = self .wait_for_request_handle(None, Duration::from_secs(30)) .await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block let block: Result, _> = self .wait_for_request_handle(None, Duration::from_secs(30)) .await? .request( "eth_getBlockByNumber", &json!(("latest", false)), tracing::Level::ERROR.into(), ) .await .map(|x| Some(Arc::new(x))); let mut last_hash = match &block { Ok(Some(new_block)) => new_block .hash .expect("blocks should always have a hash here"), _ => H256::zero(), }; self.send_head_block_result(block, &block_sender, block_map.clone()) .await?; while let Some(new_block) = stream.next().await { // TODO: check the new block's hash to be sure we don't send dupes let new_hash = new_block .hash .expect("blocks should always have a hash here"); if new_hash == last_hash { // some rpcs like to give us duplicates. don't waste our time on them continue; } else { last_hash = new_hash; } self.send_head_block_result( Ok(Some(Arc::new(new_block))), &block_sender, block_map.clone(), ) .await?; } // TODO: is this always an error? // TODO: we probably don't want a warn and to return error warn!(rpc=%self, "new_heads subscription ended"); return Err(anyhow::anyhow!("new_heads subscription ended")); } } } Ok(()) } async fn subscribe_pending_transactions( self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { info!(%self, "watching pending transactions"); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { match &*provider { Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: what should this interval be? probably automatically set to some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though let mut interval = interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { // TODO: actually do something here /* match self.try_request_handle().await { Ok(active_request_handle) => { // TODO: check the filter todo!("actually send a request"); } Err(e) => { warn!("Failed getting latest block from {}: {:?}", self, e); } } */ // wait for the interval // TODO: if error or rate limit, increase interval? interval.tick().await; } } Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self .wait_for_request_handle(None, Duration::from_secs(30)) .await; let mut stream = provider.subscribe_pending_txs().await?; drop(active_request_handle); while let Some(pending_tx_id) = stream.next().await { tx_id_sender .send_async((pending_tx_id, self.clone())) .await .context("tx_id_sender")?; // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription } // TODO: is this always an error? // TODO: we probably don't want a warn and to return error warn!(rpc=%self, "pending_transactions subscription ended"); return Err(anyhow::anyhow!("pending_transactions subscription ended")); } } } Ok(()) } /// be careful with this; it might wait forever! #[instrument] pub async fn wait_for_request_handle( self: &Arc, authorization: Option<&Arc>, max_wait: Duration, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { let x = self.try_request_handle(authorization).await; trace!(?x, "try_request_handle"); match x { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? trace!(?retry_at); if retry_at > max_wait { // break now since we will wait past our maximum wait time return Err(anyhow::anyhow!("timeout waiting for request handle")); } sleep_until(retry_at).await; } Ok(OpenRequestResult::RetryNever) => { // TODO: when can this happen? log? emit a stat? // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? return Err(anyhow::anyhow!("unable to retry for request handle")); } Err(err) => return Err(err), } } } #[instrument] pub async fn try_request_handle( self: &Arc, authorization: Option<&Arc>, ) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? // TODO: wait until we have a provider? return Ok(OpenRequestResult::RetryNever); } // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { // TODO: how should we know if we should set expire or not? match ratelimiter.throttle(true).await? { RedisRateLimitResult::Allowed(_) => { trace!("rate limit succeeded") } RedisRateLimitResult::RetryAt(retry_at, _) => { // rate limit failed // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? warn!(?retry_at, rpc=%self, "Exhausted rate limit"); return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { return Ok(OpenRequestResult::RetryNever); } } }; let handle = OpenRequestHandle::new(self.clone(), authorization.cloned()); Ok(OpenRequestResult::Handle(handle)) } } impl fmt::Debug for Web3Provider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url f.debug_struct("Web3Provider").finish_non_exhaustive() } } impl Hash for Web3Connection { fn hash(&self, state: &mut H) { // TODO: is this enough? self.name.hash(state); } } impl Eq for Web3Connection {} impl Ord for Web3Connection { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.name.cmp(&other.name) } } impl PartialOrd for Web3Connection { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl PartialEq for Web3Connection { fn eq(&self, other: &Self) -> bool { self.name == other.name } } impl Serialize for Web3Connection { fn serialize(&self, serializer: S) -> Result where S: Serializer, { // 3 is the number of fields in the struct. let mut state = serializer.serialize_struct("Web3Connection", 6)?; // the url is excluded because it likely includes private information. just show the name state.serialize_field("name", &self.name)?; let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); if block_data_limit == u64::MAX { state.serialize_field("block_data_limit", &None::<()>)?; } else { state.serialize_field("block_data_limit", &block_data_limit)?; } state.serialize_field("soft_limit", &self.soft_limit)?; state.serialize_field( "active_requests", &self.active_requests.load(atomic::Ordering::Relaxed), )?; state.serialize_field( "total_requests", &self.total_requests.load(atomic::Ordering::Relaxed), )?; let head_block_id = &*self.head_block_id.read(); state.serialize_field("head_block_id", head_block_id)?; state.end() } } impl fmt::Debug for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut f = f.debug_struct("Web3Connection"); f.field("name", &self.name); let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); if block_data_limit == u64::MAX { f.field("blocks", &"all"); } else { f.field("blocks", &block_data_limit); } f.finish_non_exhaustive() } } impl fmt::Display for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: filter basic auth and api keys write!(f, "{}", &self.name) } }