From e96f09a9c45312a6122a26dc5792a0eb80280f7a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 25 Feb 2023 23:52:33 -0800 Subject: [PATCH] refactors to make configs partially reloadable --- TODO.md | 1 + web3_proxy/src/app/mod.rs | 220 ++++++++---------- web3_proxy/src/config.rs | 6 +- web3_proxy/src/rpcs/blockchain.rs | 38 ++-- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/many.rs | 333 ++++++++++++++++------------ web3_proxy/src/rpcs/one.rs | 10 +- web3_proxy/src/rpcs/transactions.rs | 2 +- 8 files changed, 312 insertions(+), 300 deletions(-) diff --git a/TODO.md b/TODO.md index 46aa710d..04f253b8 100644 --- a/TODO.md +++ b/TODO.md @@ -676,3 +676,4 @@ in another repo: event subscriber - [ ] have an upgrade tier that queries multiple backends at once. returns on first Ok result, collects errors. if no Ok, find the most common error and then respond with that - [ ] give public_recent_ips_salt a better, more general, name - [ ] include tier in the head block logs? +- [ ] i think i use FuturesUnordered when a try_join_all might be better diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5f8130cf..2fd31965 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock}; +use crate::rpcs::blockchain::{Web3ProxyBlock}; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::transactions::TxStatus; @@ -199,6 +199,7 @@ impl DatabaseReplica { pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, + pub http_client: Option, /// Send private requests (like eth_sendRawTransaction) to all these servers pub private_rpcs: Option>, response_cache: ResponseCache, @@ -354,6 +355,8 @@ pub async fn get_migrated_db( pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles pub app: Arc, + /// handles for the balanced and private rpcs + pub app_handles: FuturesUnordered>, /// these are important and must be allowed to finish pub background_handles: FuturesUnordered>, } @@ -365,6 +368,29 @@ impl Web3ProxyApp { num_workers: usize, shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result { + // safety checks on the config + // while i would prefer this to be in a "apply_top_config" function, that is a larger refactor + if let Some(redirect) = &top_config.app.redirect_rpc_key_url { + assert!( + redirect.contains("{{rpc_key_id}}"), + "redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\"" + ); + } + + if !top_config.extra.is_empty() { + warn!( + "unknown TopConfig fields!: {:?}", + top_config.app.extra.keys() + ); + } + + if !top_config.app.extra.is_empty() { + warn!( + "unknown Web3ProxyAppConfig fields!: {:?}", + top_config.app.extra.keys() + ); + } + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) let important_background_handles = FuturesUnordered::new(); @@ -491,6 +517,47 @@ impl Web3ProxyApp { .build()?, ); + // create rate limiters + // these are optional. they require redis + let mut frontend_ip_rate_limiter = None; + let mut frontend_registered_user_rate_limiter = None; + let mut login_rate_limiter = None; + + if let Some(redis_pool) = vredis_pool.as_ref() { + if let Some(public_requests_per_period) = top_config.app.public_requests_per_period { + // chain id is included in the app name so that rpc rate limits are per-chain + let rpc_rrl = RedisRateLimiter::new( + &format!("web3_proxy:{}", top_config.app.chain_id), + "frontend", + public_requests_per_period, + 60.0, + redis_pool.clone(), + ); + + // these two rate limiters can share the base limiter + // these are deferred rate limiters because we don't want redis network requests on the hot path + // TODO: take cache_size from config + frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( + 10_000, + "ip", + rpc_rrl.clone(), + None, + )); + frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::::new( + 10_000, "key", rpc_rrl, None, + )); + } + + // login rate limiter + login_rate_limiter = Some(RedisRateLimiter::new( + "web3_proxy", + "login", + top_config.app.login_rate_limit_per_period, + 60.0, + redis_pool.clone(), + )); + } + // TODO: i don't like doing Block::default here! Change this to "None"? let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None); // TODO: will one receiver lagging be okay? how big should this be? @@ -512,20 +579,6 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(300)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // keep 1GB/5 minutes of blocks in the cache - // TODO: limits from config - // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes - // TODO: how can we do the weigher better? - let block_map: BlockHashesCache = Cache::builder() - .max_capacity(1024 * 1024 * 1024) - .weigher(|_k, v: &Web3ProxyBlock| { - // TODO: is this good enough? - 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) - }) - // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - .time_to_idle(Duration::from_secs(300)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache let response_cache = Cache::builder() @@ -567,10 +620,10 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // set up a Web3Rpcs object to hold all our connections - // TODO: for now, only the server_configs can change - let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn( - block_map.clone(), + let app_handles = FuturesUnordered::new(); + + // prepare a Web3Rpcs to hold all our balanced connections + let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn( top_config.app.chain_id, db_conn.clone(), http_client.clone(), @@ -580,25 +633,21 @@ impl Web3ProxyApp { top_config.app.min_sum_soft_limit, pending_transactions.clone(), Some(pending_tx_sender.clone()), - vredis_pool.clone(), Some(watch_consensus_head_sender), ) .await .context("spawning balanced rpcs")?; - // connect to the load balanced rpcs - balanced_rpcs.apply_server_configs(top_config.balanced_rpcs); + app_handles.push(balanced_rpcs_handle); - // connect to the private rpcs + // prepare a Web3Rpcs to hold all our private connections // only some chains have this, so this is optional - let private_rpc_configs = top_config.private_rpcs.unwrap_or_default(); - let private_rpcs = if private_rpc_configs.is_empty() { + let private_rpcs = if top_config.private_rpcs.is_none() { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); None } else { // TODO: do something with the spawn handle - let (private_rpcs, _) = Web3Rpcs::spawn( - block_map, + let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn( top_config.app.chain_id, db_conn.clone(), http_client.clone(), @@ -610,7 +659,6 @@ impl Web3ProxyApp { pending_transactions.clone(), // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have None, - vredis_pool.clone(), // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs // they also often have low rate limits // however, they are well connected to miners/validators. so maybe using them as a safety check would be good @@ -620,18 +668,15 @@ impl Web3ProxyApp { .await .context("spawning private_rpcs")?; - private_rpcs.apply_server_configs(private_rpc_configs); + app_handles.push(private_rpcs_handle); - if private_rpcs.by_name.is_empty() { - None - } else { - Some(private_rpcs) - } + Some(private_rpcs) }; let app = Self { - config: top_config.app, + config: top_config.app.clone(), balanced_rpcs, + http_client, private_rpcs, response_cache, watch_consensus_head_receiver, @@ -652,103 +697,30 @@ impl Web3ProxyApp { let app = Arc::new(app); - app.apply_config(top_config).await?; - + app.apply_top_config(top_config).await?; // TODO: use channel for receiving new top_configs // TODO: return a channel for sending new top_configs - Ok((app, important_background_handles).into()) + Ok((app, app_handles, important_background_handles).into()) } - /// update the app's balanced_rpcs and private_rpcs - /// TODO: make more of the app mutable. for now, db and - pub async fn apply_server_configs( - self: Arc, - top_config: TopConfig, - ) -> anyhow::Result<()> { - // safety checks on the config - if let Some(redirect) = &top_config.app.redirect_rpc_key_url { - assert!( - redirect.contains("{{rpc_key_id}}"), - "redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\"" - ); - } + pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { + // TODO: also update self.config from new_top_config.app - if !top_config.extra.is_empty() { - warn!( - "unknown TopConfig fields!: {:?}", - top_config.app.extra.keys() - ); - } + // connect to the backends + self.balanced_rpcs + .apply_server_configs(self, new_top_config.balanced_rpcs) + .await?; - if !top_config.app.extra.is_empty() { - warn!( - "unknown Web3ProxyAppConfig fields!: {:?}", - top_config.app.extra.keys() - ); - } - - let balanced_rpcs = top_config.balanced_rpcs; - - // safety check on balanced_rpcs - if balanced_rpcs.len() < top_config.app.min_synced_rpcs { - return Err(anyhow::anyhow!( - "Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.", - balanced_rpcs.len(), - top_config.app.min_synced_rpcs - )); - } - - // safety check on sum soft limit - let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit); - - if sum_soft_limit < top_config.app.min_sum_soft_limit { - return Err(anyhow::anyhow!( - "Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.", - sum_soft_limit, - top_config.app.min_sum_soft_limit - )); - } - - // create rate limiters - // these are optional. they require redis - let mut frontend_ip_rate_limiter = None; - let mut frontend_registered_user_rate_limiter = None; - let mut login_rate_limiter = None; - - if let Some(redis_pool) = vredis_pool.as_ref() { - if let Some(public_requests_per_period) = top_config.app.public_requests_per_period { - // chain id is included in the app name so that rpc rate limits are per-chain - let rpc_rrl = RedisRateLimiter::new( - &format!("web3_proxy:{}", top_config.app.chain_id), - "frontend", - public_requests_per_period, - 60.0, - redis_pool.clone(), - ); - - // these two rate limiters can share the base limiter - // these are deferred rate limiters because we don't want redis network requests on the hot path - // TODO: take cache_size from config - frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( - 10_000, - "ip", - rpc_rrl.clone(), - None, - )); - frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::::new( - 10_000, "key", rpc_rrl, None, - )); + if let Some(private_rpc_configs) = new_top_config.private_rpcs { + if let Some(private_rpcs) = self.private_rpcs.as_ref() { + private_rpcs + .apply_server_configs(self, private_rpc_configs) + .await?; + } else { + // TODO: maybe we should have private_rpcs just be empty instead of being None + todo!("handle toggling private_rpcs") } - - // login rate limiter - login_rate_limiter = Some(RedisRateLimiter::new( - "web3_proxy", - "login", - top_config.app.login_rate_limit_per_period, - 60.0, - redis_pool.clone(), - )); } Ok(()) diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 54456bb4..29b989d7 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,5 +1,5 @@ use crate::app::AnyhowJoinHandle; -use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock}; +use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; use ethers::prelude::TxHash; @@ -253,7 +253,7 @@ impl Web3RpcConfig { chain_id: u64, http_client: Option, http_interval_sender: Option>>, - block_map: BlockHashesCache, + blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option>, reconnect: bool, @@ -270,7 +270,7 @@ impl Web3RpcConfig { http_client, http_interval_sender, redis_pool, - block_map, + blocks_by_hash_cache, block_sender, tx_id_sender, reconnect, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index cd8957f5..da58a0ab 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -15,13 +15,13 @@ use serde::Serialize; use serde_json::json; use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, fmt::Display, sync::Arc}; -use tokio::sync::{broadcast, watch}; +use tokio::sync::broadcast; use tokio::time::Duration; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlockHashesCache = Cache; +pub type BlocksByHashCache = Cache; /// A block and its age. #[derive(Clone, Debug, Default, From, Serialize)] @@ -153,13 +153,13 @@ impl Web3Rpcs { // this is the only place that writes to block_numbers // multiple inserts should be okay though // TODO: info that there was a fork? - self.block_numbers.insert(*block_num, *block_hash).await; + self.blocks_by_number.insert(*block_num, *block_hash).await; } // this block is very likely already in block_hashes // TODO: use their get_with let block = self - .block_hashes + .blocks_by_hash .get_with(*block_hash, async move { block.clone() }) .await; @@ -178,7 +178,7 @@ impl Web3Rpcs { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere // TODO: use try_get_with - if let Some(block) = self.block_hashes.get(hash) { + if let Some(block) = self.blocks_by_hash.get(hash) { return Ok(block); } @@ -265,10 +265,10 @@ impl Web3Rpcs { // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) let mut consensus_head_receiver = self - .watch_consensus_head_receiver + .watch_consensus_head_sender .as_ref() .context("need new head subscriptions to fetch cannonical_block")? - .clone(); + .subscribe(); // be sure the requested block num exists // TODO: is this okay? what if we aren't synced?! @@ -295,7 +295,7 @@ impl Web3Rpcs { // try to get the hash from our cache // deref to not keep the lock open - if let Some(block_hash) = self.block_numbers.get(num) { + if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: pass authorization through here? let block = self.block(authorization, &block_hash, None).await?; @@ -337,7 +337,6 @@ impl Web3Rpcs { block_receiver: flume::Receiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. - head_block_sender: watch::Sender>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? @@ -364,7 +363,6 @@ impl Web3Rpcs { &mut connection_heads, new_block, rpc, - &head_block_sender, &pending_tx_sender, ) .await @@ -389,7 +387,6 @@ impl Web3Rpcs { consensus_finder: &mut ConsensusFinder, rpc_head_block: Option, rpc: Arc, - head_block_sender: &watch::Sender>, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? @@ -415,6 +412,7 @@ impl Web3Rpcs { // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? + let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap(); let consensus_tier = new_synced_connections.tier; let total_tiers = consensus_finder.len(); let backups_needed = new_synced_connections.backups_needed; @@ -456,9 +454,11 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - head_block_sender + watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("head_block_sender sending consensus_head_block")?; + .context( + "watch_consensus_head_sender failed sending first consensus_head_block", + )?; } Some(old_head_block) => { // TODO: do this log item better @@ -470,7 +470,7 @@ impl Web3Rpcs { Ordering::Equal => { // multiple blocks with the same fork! if consensus_head_block.hash() == old_head_block.hash() { - // no change in hash. no need to use head_block_sender + // no change in hash. no need to use watch_consensus_head_sender // TODO: trace level if rpc is backup debug!( "con {}/{} {}{}/{}/{} con={} rpc={}@{}", @@ -510,9 +510,9 @@ impl Web3Rpcs { .await .context("save consensus_head_block as heaviest chain")?; - head_block_sender + watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("head_block_sender sending consensus_head_block")?; + .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } } Ordering::Less => { @@ -545,9 +545,9 @@ impl Web3Rpcs { "save_block sending consensus_head_block as heaviest chain", )?; - head_block_sender + watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("head_block_sender sending consensus_head_block")?; + .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; } Ordering::Greater => { debug!( @@ -571,7 +571,7 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - head_block_sender.send(Some(consensus_head_block))?; + watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; } } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index a348b9d6..c12f52f3 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -53,7 +53,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs { impl Web3Rpcs { // TODO: return a ref? pub fn head_block(&self) -> Option { - self.watch_consensus_head_receiver + self.watch_consensus_head_sender .as_ref() .and_then(|x| x.borrow().clone()) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 695b4e8c..1ae7ce9f 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,9 +1,9 @@ ///! Load balanced communication with a group of web3 rpc providers -use super::blockchain::{BlockHashesCache, Web3ProxyBlock}; +use super::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use super::consensus::ConsensusWeb3Rpcs; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler}; -use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; @@ -38,19 +38,27 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Rpcs { + /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them + pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections pub(crate) by_name: HashMap>, + pub(crate) http_interval_sender: Option>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` + /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed + /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? + /// Geth's subscriptions have the same potential for skipping blocks. pub(super) watch_consensus_rpcs_sender: watch::Sender>, /// this head receiver makes it easy to wait until there is a new block - pub(super) watch_consensus_head_receiver: Option>>, - pub(super) pending_transactions: + pub(super) watch_consensus_head_sender: Option>>, + pub(super) pending_transaction_cache: Cache, + pub(super) pending_tx_id_receiver: flume::Receiver, + pub(super) pending_tx_id_sender: flume::Sender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans - pub(super) block_hashes: BlockHashesCache, + pub(super) blocks_by_hash: BlocksByHashCache, /// blocks on the heaviest chain - pub(super) block_numbers: Cache, + pub(super) blocks_by_number: Cache, /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) pub(super) min_head_rpcs: usize, /// the soft limit required to agree on consensus for the head block. (thundering herd protection) @@ -62,95 +70,9 @@ pub struct Web3Rpcs { } impl Web3Rpcs { - pub async fn min_head_rpcs(&self) -> usize { - self.min_head_rpcs - } - - pub async fn apply_server_configs( - &self, - server_configs: HashMap, - ) -> anyhow::Result<()> { - // turn configs into connections (in parallel) - // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) - let mut spawn_handles: FuturesUnordered<_> = server_configs - .into_iter() - .filter_map(|(server_name, server_config)| { - if server_config.disabled { - info!("{} is disabled", server_name); - return None; - } - - let db_conn = db_conn.clone(); - let http_client = http_client.clone(); - let redis_pool = redis_pool.clone(); - let http_interval_sender = http_interval_sender.clone(); - - let block_sender = if watch_consensus_head_sender.is_some() { - Some(block_sender.clone()) - } else { - None - }; - - let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); - let block_map = block_map.clone(); - - debug!("spawning {}", server_name); - - let handle = tokio::spawn(async move { - server_config - .spawn( - server_name, - db_conn, - redis_pool, - chain_id, - http_client, - http_interval_sender, - block_map, - block_sender, - pending_tx_id_sender, - true, - ) - .await - }); - - Some(handle) - }) - .collect(); - - // map of connection names to their connection - let mut connections = AsyncRwLock::new(HashMap::new()); - let mut handles = vec![]; - - while let Some(x) = spawn_handles.next().await { - match x { - Ok(Ok((connection, _handle))) => { - // web3 connection worked - connections - .write() - .await - .insert(connection.name.clone(), connection); - - // TODO: what should we do with the handle? at least log any errors - } - Ok(Err(err)) => { - // if we got an error here, the app can continue on - // TODO: include context about which connection failed - error!("Unable to create connection. err={:?}", err); - } - Err(err) => { - // something actually bad happened. exit with an error - return Err(err.into()); - } - } - } - - Ok(()) - } - /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( - block_map: BlockHashesCache, chain_id: u64, db_conn: Option, http_client: Option, @@ -158,9 +80,8 @@ impl Web3Rpcs { max_block_lag: Option, min_head_rpcs: usize, min_sum_soft_limit: u32, - pending_transactions: Cache, + pending_transaction_cache: Cache, pending_tx_sender: Option>, - redis_pool: Option, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); @@ -217,30 +138,40 @@ impl Web3Rpcs { None }; - // TODO: max_capacity and time_to_idle from config - // all block hashes are the same size, so no need for weigher - let block_hashes = Cache::builder() + // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes + // TODO: how can we do the weigher better? need to know actual allocated size + // TODO: limits from config + let blocks_by_hash: BlocksByHashCache = Cache::builder() + .max_capacity(1024 * 1024 * 1024) + .weigher(|_k, v: &Web3ProxyBlock| { + 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) + }) .time_to_idle(Duration::from_secs(600)) - .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // all block numbers are the same size, so no need for weigher - let block_numbers = Cache::builder() + // TODO: limits from config + let blocks_by_number = Cache::builder() .time_to_idle(Duration::from_secs(600)) .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); - let watch_consensus_head_receiver = - watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // by_name starts empty. self.apply_server_configs will add to it + let by_name = Default::default(); let connections = Arc::new(Self { - by_name: connections, - watch_consensus_rpcs_sender: watch_consensus_connections_sender, - watch_consensus_head_receiver, - pending_transactions, - block_hashes, - block_numbers, + block_sender, + by_name, + http_interval_sender, + watch_consensus_rpcs_sender, + watch_consensus_head_sender, + pending_transaction_cache, + pending_tx_id_sender, + pending_tx_id_receiver, + blocks_by_hash, + blocks_by_number, min_sum_soft_limit, min_head_rpcs, max_block_age, @@ -254,13 +185,7 @@ impl Web3Rpcs { tokio::spawn(async move { connections - .subscribe( - authorization, - pending_tx_id_receiver, - block_receiver, - watch_consensus_head_sender, - pending_tx_sender, - ) + .subscribe(authorization, block_receiver, pending_tx_sender) .await }) }; @@ -268,19 +193,131 @@ impl Web3Rpcs { Ok((connections, handle)) } + /// update the rpcs in this group + pub async fn apply_server_configs( + &self, + app: &Web3ProxyApp, + rpc_configs: HashMap, + ) -> anyhow::Result<()> { + // safety checks + if rpc_configs.len() < app.config.min_synced_rpcs { + return Err(anyhow::anyhow!( + "Only {}/{} rpcs! Add more rpcs or reduce min_synced_rpcs.", + rpc_configs.len(), + app.config.min_synced_rpcs + )); + } + + // safety check on sum soft limit + // TODO: will need to think about this more once sum_soft_limit is dynamic + let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit); + + // TODO: < is a bit dangerous, we should require a buffer + if sum_soft_limit < self.min_sum_soft_limit { + return Err(anyhow::anyhow!( + "Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.", + sum_soft_limit, + self.min_sum_soft_limit + )); + } + + // turn configs into connections (in parallel) + // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) + let mut spawn_handles: FuturesUnordered<_> = rpc_configs + .into_iter() + .filter_map(|(server_name, server_config)| { + if server_config.disabled { + info!("{} is disabled", server_name); + return None; + } + + let db_conn = app.db_conn(); + let http_client = app.http_client.clone(); + let vredis_pool = app.vredis_pool.clone(); + + let block_sender = if self.watch_consensus_head_sender.is_some() { + Some(self.block_sender.clone()) + } else { + None + }; + + let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); + let blocks_by_hash = self.blocks_by_hash.clone(); + let http_interval_sender = self.http_interval_sender.clone(); + let chain_id = app.config.chain_id; + + debug!("spawning {}", server_name); + + let handle = tokio::spawn(async move { + server_config + .spawn( + server_name, + db_conn, + vredis_pool, + chain_id, + http_client, + http_interval_sender, + blocks_by_hash, + block_sender, + pending_tx_id_sender, + true, + ) + .await + }); + + Some(handle) + }) + .collect(); + + // map of connection names to their connection + let connections = AsyncRwLock::new(HashMap::new()); + + while let Some(x) = spawn_handles.next().await { + match x { + Ok(Ok((connection, _handle))) => { + // web3 connection worked + let old_rpc = connections + .write() + .await + .insert(connection.name.clone(), connection); + + if let Some(old_rpc) = old_rpc { + todo!("do something to make the old one shutdown"); + } + + // TODO: what should we do with the new handle? make sure error logs aren't dropped + } + Ok(Err(err)) => { + // if we got an error here, the app can continue on + // TODO: include context about which connection failed + // TODO: will this retry automatically? i don't think so + error!("Unable to create connection. err={:?}", err); + } + Err(err) => { + // something actually bad happened. exit with an error + return Err(err.into()); + } + } + } + + Ok(()) + } + pub fn get(&self, conn_name: &str) -> Option<&Arc> { self.by_name.get(conn_name) } + pub async fn min_head_rpcs(&self) -> usize { + self.min_head_rpcs + } + /// subscribe to blocks and transactions from all the backend rpcs. /// blocks are processed by all the `Web3Rpc`s and then sent to the `block_receiver` /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, authorization: Arc, - pending_tx_id_receiver: flume::Receiver, block_receiver: flume::Receiver, - head_block_sender: Option>>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -292,6 +329,7 @@ impl Web3Rpcs { if let Some(pending_tx_sender) = pending_tx_sender.clone() { let clone = self.clone(); let authorization = authorization.clone(); + let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); let handle = task::spawn(async move { // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { @@ -311,7 +349,7 @@ impl Web3Rpcs { } // setup the block funnel - if let Some(head_block_sender) = head_block_sender { + if self.watch_consensus_head_sender.is_some() { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); @@ -319,12 +357,7 @@ impl Web3Rpcs { .name("process_incoming_blocks") .spawn(async move { connections - .process_incoming_blocks( - &authorization, - block_receiver, - head_block_sender, - pending_tx_sender, - ) + .process_incoming_blocks(&authorization, block_receiver, pending_tx_sender) .await })?; @@ -1148,12 +1181,12 @@ impl Serialize for Web3Rpcs { state.serialize_field("synced_connections", &consensus_connections)?; } - self.block_hashes.sync(); - self.block_numbers.sync(); - state.serialize_field("block_hashes_count", &self.block_hashes.entry_count())?; - state.serialize_field("block_hashes_size", &self.block_hashes.weighted_size())?; - state.serialize_field("block_numbers_count", &self.block_numbers.entry_count())?; - state.serialize_field("block_numbers_size", &self.block_numbers.weighted_size())?; + self.blocks_by_hash.sync(); + self.blocks_by_number.sync(); + state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?; + state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?; + state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?; + state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?; state.end() } } @@ -1346,18 +1379,25 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); + let (block_sender, _) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); + let (watch_consensus_head_sender, _) = watch::channel(Default::default()); // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { + block_sender, by_name: rpcs_by_name, - watch_consensus_head_receiver: None, + http_interval_sender: None, + watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transactions: Cache::builder() + pending_transaction_cache: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - block_hashes: Cache::builder() + pending_tx_id_receiver, + pending_tx_id_sender, + blocks_by_hash: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - block_numbers: Cache::builder() + blocks_by_number: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), // TODO: test max_block_age? max_block_age: None, @@ -1369,7 +1409,6 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); - let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None); // process None so that @@ -1378,7 +1417,6 @@ mod tests { &mut consensus_finder, None, lagged_rpc.clone(), - &head_block_sender, &None, ) .await @@ -1388,7 +1426,6 @@ mod tests { &mut consensus_finder, None, head_rpc.clone(), - &head_block_sender, &None, ) .await @@ -1424,7 +1461,6 @@ mod tests { &mut consensus_finder, Some(lagged_block.clone()), lagged_rpc, - &head_block_sender, &None, ) .await @@ -1434,7 +1470,6 @@ mod tests { &mut consensus_finder, Some(lagged_block.clone()), head_rpc.clone(), - &head_block_sender, &None, ) .await @@ -1450,7 +1485,6 @@ mod tests { &mut consensus_finder, Some(head_block.clone()), head_rpc, - &head_block_sender, &None, ) .await @@ -1545,18 +1579,26 @@ mod tests { (archive_rpc.name.clone(), archive_rpc.clone()), ]); + let (block_sender, _) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); + let (watch_consensus_head_sender, _watch_consensus_head_receiver) = + watch::channel(Default::default()); // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { + block_sender, by_name: rpcs_by_name, - watch_consensus_head_receiver: None, + http_interval_sender: None, + watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transactions: Cache::builder() + pending_transaction_cache: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - block_hashes: Cache::builder() + pending_tx_id_receiver, + pending_tx_id_sender, + blocks_by_hash: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - block_numbers: Cache::builder() + blocks_by_number: Cache::builder() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), min_head_rpcs: 1, min_sum_soft_limit: 4_000, @@ -1566,7 +1608,6 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); - let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None); // min sum soft limit will require tier 2 @@ -1575,7 +1616,6 @@ mod tests { &mut connection_heads, Some(head_block.clone()), pruned_rpc.clone(), - &head_block_sender, &None, ) .await @@ -1586,7 +1626,6 @@ mod tests { &mut connection_heads, Some(head_block.clone()), archive_rpc.clone(), - &head_block_sender, &None, ) .await diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 64bd43ed..b6657eac 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1,5 +1,5 @@ ///! Rate-limited communication with a web3 provider. -use super::blockchain::{ArcBlock, BlockHashesCache, Web3ProxyBlock}; +use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -151,7 +151,7 @@ impl Web3Rpc { redis_pool: Option, // TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough // soft_limit: u32, - block_map: BlockHashesCache, + block_map: BlocksByHashCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -571,7 +571,7 @@ impl Web3Rpc { self: &Arc, new_head_block: Result, ProviderError>, block_sender: &flume::Sender, - block_map: BlockHashesCache, + block_map: BlocksByHashCache, ) -> anyhow::Result<()> { let new_head_block = match new_head_block { Ok(None) => { @@ -648,7 +648,7 @@ impl Web3Rpc { async fn subscribe( self: Arc, authorization: &Arc, - block_map: BlockHashesCache, + block_map: BlocksByHashCache, block_sender: Option>, chain_id: u64, http_interval_sender: Option>>, @@ -848,7 +848,7 @@ impl Web3Rpc { authorization: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, - block_map: BlockHashesCache, + block_map: BlocksByHashCache, ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 466a92be..687c5224 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -82,7 +82,7 @@ impl Web3Rpcs { } // trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - if self.pending_transactions.contains_key(&pending_tx_id) { + if self.pending_transaction_cache.contains_key(&pending_tx_id) { // this transaction has already been processed return Ok(()); }