diff --git a/config/example.toml b/config/example.toml index 79dc8f1b..096d16ee 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,6 +1,7 @@ [shared] chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" +db_max_connections = 99 min_sum_soft_limit = 2000 min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 67cb1742..3cfc6529 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -122,6 +122,7 @@ pub async fn flatten_handles( /// Connect to the database and run migrations pub async fn get_migrated_db( db_url: String, + min_connections: u32, max_connections: u32, ) -> anyhow::Result { let mut db_opt = sea_orm::ConnectOptions::new(db_url); @@ -129,7 +130,7 @@ pub async fn get_migrated_db( // TODO: load all these options from the config file. i think mysql default max is 100 // TODO: sqlx logging only in debug. way too verbose for production db_opt - .min_connections(1) + .min_connections(min_connections) .max_connections(max_connections) .connect_timeout(Duration::from_secs(8)) .idle_timeout(Duration::from_secs(8)) @@ -149,7 +150,6 @@ impl Web3ProxyApp { pub async fn spawn( app_stats: AppStats, top_config: TopConfig, - workers: usize, ) -> anyhow::Result<( Arc, Pin>>>, @@ -162,9 +162,16 @@ impl Web3ProxyApp { // first, we connect to mysql and make sure the latest migrations have run let db_conn = if let Some(db_url) = &top_config.app.db_url { - let max_connections = workers.try_into()?; + let db_min_connections = top_config.app.db_min_connections; - let db = get_migrated_db(db_url.clone(), max_connections).await?; + // TODO: what default multiple? + let redis_max_connections = top_config + .app + .db_max_connections + .unwrap_or(db_min_connections * 4); + + let db = + get_migrated_db(db_url.clone(), db_min_connections, redis_max_connections).await?; Some(db) } else { @@ -200,14 +207,19 @@ impl Web3ProxyApp { let manager = RedisConnectionManager::new(redis_url.as_ref())?; - let min_size = workers as u32; - let max_size = min_size * 4; + let redis_min_connections = top_config.app.redis_min_connections; + + let redis_max_connections = top_config + .app + .redis_max_connections + .unwrap_or(redis_min_connections * 4); + // TODO: min_idle? // TODO: set max_size based on max expected concurrent connections? set based on num_workers? let builder = bb8::Pool::builder() .error_sink(RedisErrorSink.boxed_clone()) - .min_idle(Some(min_size)) - .max_size(max_size); + .min_idle(Some(redis_min_connections)) + .max_size(redis_max_connections); let pool = builder.build(manager).await?; diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 2d2b17e3..7cdacfda 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -76,7 +76,7 @@ fn run( let app_frontend_port = cli_config.port; let app_prometheus_port = cli_config.prometheus_port; - let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config, num_workers).await?; + let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config).await?; let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app)); @@ -212,25 +212,23 @@ mod tests { let app_config = TopConfig { app: AppConfig { chain_id: 31337, - db_url: None, default_requests_per_minute: 6_000_000, - invite_code: None, - redis_url: None, min_sum_soft_limit: 1, min_synced_rpcs: 1, public_rate_limit_per_minute: 0, response_cache_max_bytes: 10_usize.pow(7), redirect_public_url: "example.com/".to_string(), redirect_user_url: "example.com/users/{user_address}".to_string(), + ..Default::default() }, balanced_rpcs: HashMap::from([ ( "anvil".to_string(), - Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1), + Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1, Some(false)), ), ( "anvil_ws".to_string(), - Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0), + Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0, Some(true)), ), ]), private_rpcs: None, diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index bf32bafd..7182b6f9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { match cli_config.sub_command { SubCommand::CreateUser(x) => { - let db = get_migrated_db(cli_config.db_url, 1).await?; + let db = get_migrated_db(cli_config.db_url, 1, 1).await?; x.main(&db).await } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index f8c5f260..6b9ad45b 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -5,6 +5,7 @@ use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::TxHash; use hashbrown::HashMap; +use num::One; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; @@ -40,21 +41,32 @@ pub struct TopConfig { } /// shared configuration between Web3Connections -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] pub struct AppConfig { // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 pub chain_id: u64, pub db_url: Option, - pub invite_code: Option, + /// minimum size of the connection pool for the database + #[serde(default = "u32::one")] + pub db_min_connections: u32, + /// minimum size of the connection pool for the database + pub db_max_connections: Option, #[serde(default = "default_default_requests_per_minute")] pub default_requests_per_minute: u32, + pub invite_code: Option, #[serde(default = "default_min_sum_soft_limit")] pub min_sum_soft_limit: u32, #[serde(default = "default_min_synced_rpcs")] pub min_synced_rpcs: usize, - pub redis_url: Option, + /// Set to 0 to block all anonymous requests #[serde(default = "default_public_rate_limit_per_minute")] pub public_rate_limit_per_minute: u64, + pub redis_url: Option, + /// minimum size of the connection pool for the cache + #[serde(default = "u32::one")] + pub redis_min_connections: u32, + /// maximum size of the connection pool for the cache + pub redis_max_connections: Option, #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, /// the stats page url for an anonymous user. @@ -93,12 +105,12 @@ pub struct Web3ConnectionConfig { soft_limit: u32, hard_limit: Option, weight: u32, + subscribe_txs: Option, } impl Web3ConnectionConfig { /// Create a Web3Connection from config /// TODO: move this into Web3Connection (just need to make things pub(crate)) - // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] #[allow(clippy::too_many_arguments)] pub async fn spawn( self, @@ -122,6 +134,12 @@ impl Web3ConnectionConfig { } }; + let tx_id_sender = if self.subscribe_txs.unwrap_or(false) { + tx_id_sender + } else { + None + }; + Web3Connection::spawn( name, chain_id, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e1652c72..b97e7424 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -17,7 +17,7 @@ use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; pub type ArcBlock = Arc>; @@ -38,7 +38,7 @@ impl Display for BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain - pub fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { + pub fn save_block(&self, block: &ArcBlock, heaviest_chain: Option) -> anyhow::Result<()> { let block_hash = block.hash.as_ref().context("no block hash")?; let block_num = block.number.as_ref().context("no block num")?; let _block_td = block @@ -46,14 +46,21 @@ impl Web3Connections { .as_ref() .context("no block total difficulty")?; - if heaviest_chain { + // think more about heaviest_chain + if heaviest_chain.unwrap_or(true) { match self.block_numbers.entry(*block_num) { Entry::Occupied(mut x) => { - let old = x.insert(*block_hash); + let old_hash = x.insert(*block_hash); + + if block_hash == &old_hash { + // this block has already been saved + return Ok(()); + } // TODO: what should we do? + // TODO: if old_hash's number is > block_num, we need to remove more entries warn!( - "do something with the old hash. we may need to update a bunch more block numbers" + "do something with the old hash? we may need to update a bunch more block numbers" ) } Entry::Vacant(x) => { @@ -100,7 +107,7 @@ impl Web3Connections { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - /// WARNING! This may wait forever. be sure this runs with your own timeout + /// WARNING! If rpc is specified, this may wait forever. be sure this runs with your own timeout pub async fn block( &self, hash: &H256, @@ -139,10 +146,7 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByHash, so it should have all fields - // TODO: how should we set this? all_simple_paths on the map? - let heaviest_chain = false; - - self.save_block(&block, heaviest_chain)?; + self.save_block(&block, None)?; Ok(block) } @@ -197,16 +201,14 @@ impl Web3Connections { .try_send_best_upstream_server(request, Some(num)) .await?; - let block = response.result.unwrap(); + let raw_block = response.result.context("no block result")?; - let block: Block = serde_json::from_str(block.get())?; + let block: Block = serde_json::from_str(raw_block.get())?; let block = Arc::new(block); // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain - let heaviest_chain = true; - - self.save_block(&block, heaviest_chain)?; + self.save_block(&block, Some(true))?; Ok(block) } @@ -263,7 +265,7 @@ impl Web3Connections { connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); // we don't know if its on the heaviest chain yet - self.save_block(&rpc_head_block, false)?; + self.save_block(&rpc_head_block, Some(false))?; Some(BlockId { hash: rpc_head_hash, @@ -405,6 +407,9 @@ impl Web3Connections { match &old_synced_connections.head_block_id { None => { debug!(block=%heavy_block_id, %rpc, "first consensus head"); + + self.save_block(&rpc_head_block, Some(true))?; + head_block_sender.send(heavy_block)?; } Some(old_block_id) => { @@ -413,13 +418,14 @@ impl Web3Connections { // multiple blocks with the same fork! if heavy_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(heavy=%heavy_block_id, %rpc, "consensus head block") + debug!(head=%heavy_block_id, %rpc, "con block") } else { // hash changed // TODO: better log warn!(heavy=%heavy_block_id, %rpc, "fork detected"); // todo!("handle equal by updating the cannonical chain"); + self.save_block(&rpc_head_block, Some(true))?; head_block_sender.send(heavy_block)?; } @@ -427,15 +433,20 @@ impl Web3Connections { Ordering::Less => { // this is unlikely but possible // TODO: better log - debug!("chain rolled back"); + warn!(head=%heavy_block_id, %rpc, "chain rolled back"); + + self.save_block(&rpc_head_block, Some(true))?; + // todo!("handle less by removing higher blocks from the cannonical chain"); head_block_sender.send(heavy_block)?; } Ordering::Greater => { - debug!(heavy=%heavy_block_id, %rpc, "new head block"); + debug!(head=%heavy_block_id, %rpc, "new block"); // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); + self.save_block(&rpc_head_block, Some(true))?; + head_block_sender.send(heavy_block)?; } } @@ -455,126 +466,6 @@ impl Web3Connections { // TODO: log different things depending on old_synced_connections } - return Ok(()); - - todo!("double check everything under this"); - - /* - let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit; - let num_synced_rpcs = heavy_rpcs.len() as u32; - - let new_synced_connections = if soft_limit_met { - // we have a heavy large enough to serve traffic - let head_block_hash = highest_work_block.hash.unwrap(); - let head_block_num = highest_work_block.number.unwrap(); - - if num_synced_rpcs < self.min_synced_rpcs { - // TODO: warn is too loud. if we are first starting, this is expected to happen - warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); - - None - } else { - // TODO: wait until at least most of the rpcs have given their initial block? - // otherwise, if there is a syncing node that is fast, our first head block might not be good - - // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? - let conns = heavy_rpcs.into_iter().cloned().collect(); - - let head_block_id = BlockId { - hash: head_block_hash, - num: head_block_num, - }; - - let new_synced_connections = SyncedConnections { - head_block_id: Some(head_block_id), - conns, - }; - - Some(new_synced_connections) - } - } else { - // failure even after checking parent heads! - // not enough servers are in sync to server traffic - // TODO: at startup this is fine, but later its a problem - None - }; - - if let Some(new_synced_connections) = new_synced_connections { - let heavy_block_id = new_synced_connections.head_block_id.clone(); - - let new_synced_connections = Arc::new(new_synced_connections); - - let old_synced_connections = self.synced_connections.swap(new_synced_connections); - - let num_connection_heads = connection_heads.len(); - let total_conns = self.conns.len(); - - match (&old_synced_connections.head_block_id, &heavy_block_id) { - (None, None) => warn!("no servers synced"), - (None, Some(heavy_block_id)) => { - debug!(block=%heavy_block_id, %rpc, "first consensus head"); - } - (Some(_), None) => warn!("no longer synced!"), - (Some(old_block_id), Some(heavy_block_id)) => { - debug_assert_ne!(heavy_block_id.num, U64::zero()); - - match heavy_block_id.num.cmp(&old_block_id.num) { - Ordering::Equal => { - // multiple blocks with the same fork! - debug!("fork detected"); - todo!("handle equal"); - } - Ordering::Less => { - // this seems unlikely - warn!("chain rolled back"); - todo!("handle less"); - } - Ordering::Greater => { - info!(heavy=%heavy_block_id, %rpc, "new head block"); - - todo!("handle greater"); - } - } - } - } - } else { - todo!() - } - */ - /* - if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() { - // this is fine. we have our first hash - } else if rpc_head_block.hash.is_some() - && old_synced_connections.head_block_id.is_some() - && old_synced_connections - .head_block_id - .as_ref() - .map_ok(|x| x.num) - != rpc_head_block.hash - { - info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), heavy=?heavy_block_id, %rpc, "non heavy head"); - // TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time - // TODO: if there is any non-heavy head log how many nodes are on it - } */ - - /* - if heavy_block_num == U64::zero { - warn!(?soft_limit_met, %heavy_block_hash, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) - } else if heavy_block_hash == old_head_hash { - debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "cur heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); - } else if soft_limit_met { - // TODO: if new's parent is not old, warn? - - debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "NEW heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); - - // the head hash changed. forward to any subscribers - head_block_sender.send(highest_work_block)?; - - // TODO: do something with pending_tx_sender - } else { - // TODO: i don't think we can get here - warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) - } - */ + Ok(()) } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index f063fa8b..69b9a4a8 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -251,7 +251,7 @@ impl Web3Connections { pending_tx_sender, ) .await - }); + })?; futures.push(flatten_handle(handle)); } @@ -264,7 +264,7 @@ impl Web3Connections { sleep(Duration::from_secs(600)).await; // TODO: "every interval, check that the provider is still connected" } - }); + })?; futures.push(flatten_handle(handle)); }