From c9e5661c5be0adb9c6fb6f7c8231ea5f02e99427 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 25 Feb 2023 09:48:40 -0800 Subject: [PATCH] wip --- TODO.md | 1 + web3_proxy/src/app/mod.rs | 344 ++++++++++++++++++------------------ web3_proxy/src/rpcs/many.rs | 161 +++++++++-------- 3 files changed, 264 insertions(+), 242 deletions(-) diff --git a/TODO.md b/TODO.md index d5507bf4..8db1a2e1 100644 --- a/TODO.md +++ b/TODO.md @@ -496,6 +496,7 @@ These are not yet ordered. There might be duplicates. We might not actually need These are not ordered. I think some rows also accidently got deleted here. Check git history. +- [ ] less Arc (and more pin?). we use arcs on a lot of things where i think a &self should work fine. - [ ] automatically tune database and redis connection pool size - [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then - [ ] handle user payments diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6735f7f8..5f8130cf 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -354,8 +354,6 @@ pub async fn get_migrated_db( pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles pub app: Arc, - // cancellable handles - pub app_handles: FuturesUnordered>, /// these are important and must be allowed to finish pub background_handles: FuturesUnordered>, } @@ -367,27 +365,8 @@ impl Web3ProxyApp { num_workers: usize, shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result { - // safety checks on the config - if let Some(redirect) = &top_config.app.redirect_rpc_key_url { - assert!( - redirect.contains("{{rpc_key_id}}"), - "redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\"" - ); - } - - if !top_config.extra.is_empty() { - warn!( - "unknown TopConfig fields!: {:?}", - top_config.app.extra.keys() - ); - } - - if !top_config.app.extra.is_empty() { - warn!( - "unknown Web3ProxyAppConfig fields!: {:?}", - top_config.app.extra.keys() - ); - } + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) + let important_background_handles = FuturesUnordered::new(); let mut db_conn = None::; let mut db_replica = None::; @@ -447,46 +426,7 @@ impl Web3ProxyApp { warn!("no database. some features will be disabled"); }; - let balanced_rpcs = top_config.balanced_rpcs; - - // safety check on balanced_rpcs - if balanced_rpcs.len() < top_config.app.min_synced_rpcs { - return Err(anyhow::anyhow!( - "Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.", - balanced_rpcs.len(), - top_config.app.min_synced_rpcs - )); - } - - // safety check on sum soft limit - let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit); - - if sum_soft_limit < top_config.app.min_sum_soft_limit { - return Err(anyhow::anyhow!( - "Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.", - sum_soft_limit, - top_config.app.min_sum_soft_limit - )); - } - - let private_rpcs = top_config.private_rpcs.unwrap_or_default(); - - // these are safe to cancel - let cancellable_handles = FuturesUnordered::new(); - // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) - let important_background_handles = FuturesUnordered::new(); - - // make a http shared client - // TODO: can we configure the connection pool? should we? - // TODO: timeouts from config. defaults are hopefully good - let http_client = Some( - reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(5 * 60)) - .user_agent(APP_USER_AGENT) - .build()?, - ); - + // TODO: do this during apply_config so that we can change redis url while running // create a connection pool for redis // a failure to connect does NOT block the application from starting let vredis_pool = match top_config.app.volatile_redis_url.as_ref() { @@ -540,6 +480,17 @@ impl Web3ProxyApp { None }; + // make a http shared client + // TODO: can we configure the connection pool? should we? + // TODO: timeouts from config. defaults are hopefully good + let http_client = Some( + reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(5 * 60)) + .user_agent(APP_USER_AGENT) + .build()?, + ); + // TODO: i don't like doing Block::default here! Change this to "None"? let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None); // TODO: will one receiver lagging be okay? how big should this be? @@ -575,110 +526,6 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(300)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // connect to the load balanced rpcs - let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn( - block_map.clone(), - top_config.app.chain_id, - db_conn.clone(), - http_client.clone(), - top_config.app.max_block_age, - top_config.app.max_block_lag, - top_config.app.min_synced_rpcs, - top_config.app.min_sum_soft_limit, - pending_transactions.clone(), - Some(pending_tx_sender.clone()), - vredis_pool.clone(), - balanced_rpcs, - Some(watch_consensus_head_sender), - ) - .await - .context("spawning balanced rpcs")?; - - // save the handle to catch any errors - cancellable_handles.push(balanced_handle); - - // connect to the private rpcs - // only some chains have this, so this is optional - let private_rpcs = if private_rpcs.is_empty() { - // TODO: do None instead of clone? - warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); - None - } else { - let (private_rpcs, private_handle) = Web3Rpcs::spawn( - block_map, - top_config.app.chain_id, - db_conn.clone(), - http_client.clone(), - // private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag - None, - None, - 0, - 0, - pending_transactions.clone(), - // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have - None, - vredis_pool.clone(), - private_rpcs, - // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs - // they also often have low rate limits - // however, they are well connected to miners/validators. so maybe using them as a safety check would be good - // TODO: but maybe we could include privates in the "backup" tier - None, - ) - .await - .context("spawning private_rpcs")?; - - if private_rpcs.by_name.is_empty() { - None - } else { - // save the handle to catch any errors - cancellable_handles.push(private_handle); - - Some(private_rpcs) - } - }; - - // create rate limiters - // these are optional. they require redis - let mut frontend_ip_rate_limiter = None; - let mut frontend_registered_user_rate_limiter = None; - let mut login_rate_limiter = None; - - if let Some(redis_pool) = vredis_pool.as_ref() { - if let Some(public_requests_per_period) = top_config.app.public_requests_per_period { - // chain id is included in the app name so that rpc rate limits are per-chain - let rpc_rrl = RedisRateLimiter::new( - &format!("web3_proxy:{}", top_config.app.chain_id), - "frontend", - public_requests_per_period, - 60.0, - redis_pool.clone(), - ); - - // these two rate limiters can share the base limiter - // these are deferred rate limiters because we don't want redis network requests on the hot path - // TODO: take cache_size from config - frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( - 10_000, - "ip", - rpc_rrl.clone(), - None, - )); - frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::::new( - 10_000, "key", rpc_rrl, None, - )); - } - - // login rate limiter - login_rate_limiter = Some(RedisRateLimiter::new( - "web3_proxy", - "login", - top_config.app.login_rate_limit_per_period, - 60.0, - redis_pool.clone(), - )); - } - // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache let response_cache = Cache::builder() @@ -720,6 +567,68 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // set up a Web3Rpcs object to hold all our connections + // TODO: for now, only the server_configs can change + let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn( + block_map.clone(), + top_config.app.chain_id, + db_conn.clone(), + http_client.clone(), + top_config.app.max_block_age, + top_config.app.max_block_lag, + top_config.app.min_synced_rpcs, + top_config.app.min_sum_soft_limit, + pending_transactions.clone(), + Some(pending_tx_sender.clone()), + vredis_pool.clone(), + Some(watch_consensus_head_sender), + ) + .await + .context("spawning balanced rpcs")?; + + // connect to the load balanced rpcs + balanced_rpcs.apply_server_configs(top_config.balanced_rpcs); + + // connect to the private rpcs + // only some chains have this, so this is optional + let private_rpc_configs = top_config.private_rpcs.unwrap_or_default(); + let private_rpcs = if private_rpc_configs.is_empty() { + warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); + None + } else { + // TODO: do something with the spawn handle + let (private_rpcs, _) = Web3Rpcs::spawn( + block_map, + top_config.app.chain_id, + db_conn.clone(), + http_client.clone(), + // private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag + None, + None, + 0, + 0, + pending_transactions.clone(), + // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have + None, + vredis_pool.clone(), + // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs + // they also often have low rate limits + // however, they are well connected to miners/validators. so maybe using them as a safety check would be good + // TODO: but maybe we could include privates in the "backup" tier + None, + ) + .await + .context("spawning private_rpcs")?; + + private_rpcs.apply_server_configs(private_rpc_configs); + + if private_rpcs.by_name.is_empty() { + None + } else { + Some(private_rpcs) + } + }; + let app = Self { config: top_config.app, balanced_rpcs, @@ -743,7 +652,106 @@ impl Web3ProxyApp { let app = Arc::new(app); - Ok((app, cancellable_handles, important_background_handles).into()) + app.apply_config(top_config).await?; + + // TODO: use channel for receiving new top_configs + // TODO: return a channel for sending new top_configs + + Ok((app, important_background_handles).into()) + } + + /// update the app's balanced_rpcs and private_rpcs + /// TODO: make more of the app mutable. for now, db and + pub async fn apply_server_configs( + self: Arc, + top_config: TopConfig, + ) -> anyhow::Result<()> { + // safety checks on the config + if let Some(redirect) = &top_config.app.redirect_rpc_key_url { + assert!( + redirect.contains("{{rpc_key_id}}"), + "redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\"" + ); + } + + if !top_config.extra.is_empty() { + warn!( + "unknown TopConfig fields!: {:?}", + top_config.app.extra.keys() + ); + } + + if !top_config.app.extra.is_empty() { + warn!( + "unknown Web3ProxyAppConfig fields!: {:?}", + top_config.app.extra.keys() + ); + } + + let balanced_rpcs = top_config.balanced_rpcs; + + // safety check on balanced_rpcs + if balanced_rpcs.len() < top_config.app.min_synced_rpcs { + return Err(anyhow::anyhow!( + "Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.", + balanced_rpcs.len(), + top_config.app.min_synced_rpcs + )); + } + + // safety check on sum soft limit + let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit); + + if sum_soft_limit < top_config.app.min_sum_soft_limit { + return Err(anyhow::anyhow!( + "Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.", + sum_soft_limit, + top_config.app.min_sum_soft_limit + )); + } + + // create rate limiters + // these are optional. they require redis + let mut frontend_ip_rate_limiter = None; + let mut frontend_registered_user_rate_limiter = None; + let mut login_rate_limiter = None; + + if let Some(redis_pool) = vredis_pool.as_ref() { + if let Some(public_requests_per_period) = top_config.app.public_requests_per_period { + // chain id is included in the app name so that rpc rate limits are per-chain + let rpc_rrl = RedisRateLimiter::new( + &format!("web3_proxy:{}", top_config.app.chain_id), + "frontend", + public_requests_per_period, + 60.0, + redis_pool.clone(), + ); + + // these two rate limiters can share the base limiter + // these are deferred rate limiters because we don't want redis network requests on the hot path + // TODO: take cache_size from config + frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( + 10_000, + "ip", + rpc_rrl.clone(), + None, + )); + frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::::new( + 10_000, "key", rpc_rrl, None, + )); + } + + // login rate limiter + login_rate_limiter = Some(RedisRateLimiter::new( + "web3_proxy", + "login", + top_config.app.login_rate_limit_per_period, + 60.0, + redis_pool.clone(), + )); + } + + Ok(()) } pub fn head_block_receiver(&self) -> watch::Receiver> { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 2aeeb8e9..695b4e8c 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -31,7 +31,7 @@ use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; -use tokio::sync::{broadcast, watch}; +use tokio::sync::{broadcast, watch, RwLock as AsyncRwLock}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -62,6 +62,91 @@ pub struct Web3Rpcs { } impl Web3Rpcs { + pub async fn min_head_rpcs(&self) -> usize { + self.min_head_rpcs + } + + pub async fn apply_server_configs( + &self, + server_configs: HashMap, + ) -> anyhow::Result<()> { + // turn configs into connections (in parallel) + // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) + let mut spawn_handles: FuturesUnordered<_> = server_configs + .into_iter() + .filter_map(|(server_name, server_config)| { + if server_config.disabled { + info!("{} is disabled", server_name); + return None; + } + + let db_conn = db_conn.clone(); + let http_client = http_client.clone(); + let redis_pool = redis_pool.clone(); + let http_interval_sender = http_interval_sender.clone(); + + let block_sender = if watch_consensus_head_sender.is_some() { + Some(block_sender.clone()) + } else { + None + }; + + let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); + let block_map = block_map.clone(); + + debug!("spawning {}", server_name); + + let handle = tokio::spawn(async move { + server_config + .spawn( + server_name, + db_conn, + redis_pool, + chain_id, + http_client, + http_interval_sender, + block_map, + block_sender, + pending_tx_id_sender, + true, + ) + .await + }); + + Some(handle) + }) + .collect(); + + // map of connection names to their connection + let mut connections = AsyncRwLock::new(HashMap::new()); + let mut handles = vec![]; + + while let Some(x) = spawn_handles.next().await { + match x { + Ok(Ok((connection, _handle))) => { + // web3 connection worked + connections + .write() + .await + .insert(connection.name.clone(), connection); + + // TODO: what should we do with the handle? at least log any errors + } + Ok(Err(err)) => { + // if we got an error here, the app can continue on + // TODO: include context about which connection failed + error!("Unable to create connection. err={:?}", err); + } + Err(err) => { + // something actually bad happened. exit with an error + return Err(err.into()); + } + } + } + + Ok(()) + } + /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( @@ -76,13 +161,12 @@ impl Web3Rpcs { pending_transactions: Cache, pending_tx_sender: Option>, redis_pool: Option, - server_configs: HashMap, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); - // TODO: query the rpc to get the actual expected block time, or get from config? + // TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check? let expected_block_time_ms = match chain_id { // ethereum 1 => 12_000, @@ -133,77 +217,6 @@ impl Web3Rpcs { None }; - // turn configs into connections (in parallel) - // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) - let mut spawn_handles: FuturesUnordered<_> = server_configs - .into_iter() - .filter_map(|(server_name, server_config)| { - if server_config.disabled { - info!("{} is disabled", server_name); - return None; - } - - let db_conn = db_conn.clone(); - let http_client = http_client.clone(); - let redis_pool = redis_pool.clone(); - let http_interval_sender = http_interval_sender.clone(); - - let block_sender = if watch_consensus_head_sender.is_some() { - Some(block_sender.clone()) - } else { - None - }; - - let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); - let block_map = block_map.clone(); - - debug!("spawning {}", server_name); - - let handle = tokio::spawn(async move { - server_config - .spawn( - server_name, - db_conn, - redis_pool, - chain_id, - http_client, - http_interval_sender, - block_map, - block_sender, - pending_tx_id_sender, - true, - ) - .await - }); - - Some(handle) - }) - .collect(); - - // map of connection names to their connection - let mut connections = HashMap::new(); - let mut handles = vec![]; - - // TODO: futures unordered? - while let Some(x) = spawn_handles.next().await { - match x { - Ok(Ok((connection, handle))) => { - // web3 connection worked - connections.insert(connection.name.clone(), connection); - handles.push(handle); - } - Ok(Err(err)) => { - // if we got an error here, the app can continue on - // TODO: include context about which connection failed - error!("Unable to create connection. err={:?}", err); - } - Err(err) => { - // something actually bad happened. exit with an error - return Err(err.into()); - } - } - } - // TODO: max_capacity and time_to_idle from config // all block hashes are the same size, so no need for weigher let block_hashes = Cache::builder()