better connection pool sizing

This commit is contained in:
Bryan Stitt 2022-09-02 20:16:20 +00:00
parent b259b56dee
commit ac6296c5ac
7 changed files with 81 additions and 161 deletions

@ -1,6 +1,7 @@
[shared]
chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
db_max_connections = 99
min_sum_soft_limit = 2000
min_synced_rpcs = 2
redis_url = "redis://dev-redis:6379/"

@ -122,6 +122,7 @@ pub async fn flatten_handles<T>(
/// Connect to the database and run migrations
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> anyhow::Result<DatabaseConnection> {
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
@ -129,7 +130,7 @@ pub async fn get_migrated_db(
// TODO: load all these options from the config file. i think mysql default max is 100
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
.min_connections(1)
.min_connections(min_connections)
.max_connections(max_connections)
.connect_timeout(Duration::from_secs(8))
.idle_timeout(Duration::from_secs(8))
@ -149,7 +150,6 @@ impl Web3ProxyApp {
pub async fn spawn(
app_stats: AppStats,
top_config: TopConfig,
workers: usize,
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
@ -162,9 +162,16 @@ impl Web3ProxyApp {
// first, we connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = &top_config.app.db_url {
let max_connections = workers.try_into()?;
let db_min_connections = top_config.app.db_min_connections;
let db = get_migrated_db(db_url.clone(), max_connections).await?;
// TODO: what default multiple?
let redis_max_connections = top_config
.app
.db_max_connections
.unwrap_or(db_min_connections * 4);
let db =
get_migrated_db(db_url.clone(), db_min_connections, redis_max_connections).await?;
Some(db)
} else {
@ -200,14 +207,19 @@ impl Web3ProxyApp {
let manager = RedisConnectionManager::new(redis_url.as_ref())?;
let min_size = workers as u32;
let max_size = min_size * 4;
let redis_min_connections = top_config.app.redis_min_connections;
let redis_max_connections = top_config
.app
.redis_max_connections
.unwrap_or(redis_min_connections * 4);
// TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
let builder = bb8::Pool::builder()
.error_sink(RedisErrorSink.boxed_clone())
.min_idle(Some(min_size))
.max_size(max_size);
.min_idle(Some(redis_min_connections))
.max_size(redis_max_connections);
let pool = builder.build(manager).await?;

@ -76,7 +76,7 @@ fn run(
let app_frontend_port = cli_config.port;
let app_prometheus_port = cli_config.prometheus_port;
let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config, num_workers).await?;
let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config).await?;
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app));
@ -212,25 +212,23 @@ mod tests {
let app_config = TopConfig {
app: AppConfig {
chain_id: 31337,
db_url: None,
default_requests_per_minute: 6_000_000,
invite_code: None,
redis_url: None,
min_sum_soft_limit: 1,
min_synced_rpcs: 1,
public_rate_limit_per_minute: 0,
response_cache_max_bytes: 10_usize.pow(7),
redirect_public_url: "example.com/".to_string(),
redirect_user_url: "example.com/users/{user_address}".to_string(),
..Default::default()
},
balanced_rpcs: HashMap::from([
(
"anvil".to_string(),
Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1),
Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1, Some(false)),
),
(
"anvil_ws".to_string(),
Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0),
Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0, Some(true)),
),
]),
private_rpcs: None,

@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
match cli_config.sub_command {
SubCommand::CreateUser(x) => {
let db = get_migrated_db(cli_config.db_url, 1).await?;
let db = get_migrated_db(cli_config.db_url, 1, 1).await?;
x.main(&db).await
}

@ -5,6 +5,7 @@ use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::TxHash;
use hashbrown::HashMap;
use num::One;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -40,21 +41,32 @@ pub struct TopConfig {
}
/// shared configuration between Web3Connections
#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Deserialize)]
pub struct AppConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64,
pub db_url: Option<String>,
pub invite_code: Option<String>,
/// minimum size of the connection pool for the database
#[serde(default = "u32::one")]
pub db_min_connections: u32,
/// minimum size of the connection pool for the database
pub db_max_connections: Option<u32>,
#[serde(default = "default_default_requests_per_minute")]
pub default_requests_per_minute: u32,
pub invite_code: Option<String>,
#[serde(default = "default_min_sum_soft_limit")]
pub min_sum_soft_limit: u32,
#[serde(default = "default_min_synced_rpcs")]
pub min_synced_rpcs: usize,
pub redis_url: Option<String>,
/// Set to 0 to block all anonymous requests
#[serde(default = "default_public_rate_limit_per_minute")]
pub public_rate_limit_per_minute: u64,
pub redis_url: Option<String>,
/// minimum size of the connection pool for the cache
#[serde(default = "u32::one")]
pub redis_min_connections: u32,
/// maximum size of the connection pool for the cache
pub redis_max_connections: Option<u32>,
#[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize,
/// the stats page url for an anonymous user.
@ -93,12 +105,12 @@ pub struct Web3ConnectionConfig {
soft_limit: u32,
hard_limit: Option<u64>,
weight: u32,
subscribe_txs: Option<bool>,
}
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate))
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
self,
@ -122,6 +134,12 @@ impl Web3ConnectionConfig {
}
};
let tx_id_sender = if self.subscribe_txs.unwrap_or(false) {
tx_id_sender
} else {
None
};
Web3Connection::spawn(
name,
chain_id,

@ -17,7 +17,7 @@ use serde::Serialize;
use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
pub type ArcBlock = Arc<Block<TxHash>>;
@ -38,7 +38,7 @@ impl Display for BlockId {
impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain
pub fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
pub fn save_block(&self, block: &ArcBlock, heaviest_chain: Option<bool>) -> anyhow::Result<()> {
let block_hash = block.hash.as_ref().context("no block hash")?;
let block_num = block.number.as_ref().context("no block num")?;
let _block_td = block
@ -46,14 +46,21 @@ impl Web3Connections {
.as_ref()
.context("no block total difficulty")?;
if heaviest_chain {
// think more about heaviest_chain
if heaviest_chain.unwrap_or(true) {
match self.block_numbers.entry(*block_num) {
Entry::Occupied(mut x) => {
let old = x.insert(*block_hash);
let old_hash = x.insert(*block_hash);
if block_hash == &old_hash {
// this block has already been saved
return Ok(());
}
// TODO: what should we do?
// TODO: if old_hash's number is > block_num, we need to remove more entries
warn!(
"do something with the old hash. we may need to update a bunch more block numbers"
"do something with the old hash? we may need to update a bunch more block numbers"
)
}
Entry::Vacant(x) => {
@ -100,7 +107,7 @@ impl Web3Connections {
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
/// WARNING! This may wait forever. be sure this runs with your own timeout
/// WARNING! If rpc is specified, this may wait forever. be sure this runs with your own timeout
pub async fn block(
&self,
hash: &H256,
@ -139,10 +146,7 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: how should we set this? all_simple_paths on the map?
let heaviest_chain = false;
self.save_block(&block, heaviest_chain)?;
self.save_block(&block, None)?;
Ok(block)
}
@ -197,16 +201,14 @@ impl Web3Connections {
.try_send_best_upstream_server(request, Some(num))
.await?;
let block = response.result.unwrap();
let raw_block = response.result.context("no block result")?;
let block: Block<TxHash> = serde_json::from_str(block.get())?;
let block: Block<TxHash> = serde_json::from_str(raw_block.get())?;
let block = Arc::new(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
let heaviest_chain = true;
self.save_block(&block, heaviest_chain)?;
self.save_block(&block, Some(true))?;
Ok(block)
}
@ -263,7 +265,7 @@ impl Web3Connections {
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false)?;
self.save_block(&rpc_head_block, Some(false))?;
Some(BlockId {
hash: rpc_head_hash,
@ -405,6 +407,9 @@ impl Web3Connections {
match &old_synced_connections.head_block_id {
None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
self.save_block(&rpc_head_block, Some(true))?;
head_block_sender.send(heavy_block)?;
}
Some(old_block_id) => {
@ -413,13 +418,14 @@ impl Web3Connections {
// multiple blocks with the same fork!
if heavy_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(heavy=%heavy_block_id, %rpc, "consensus head block")
debug!(head=%heavy_block_id, %rpc, "con block")
} else {
// hash changed
// TODO: better log
warn!(heavy=%heavy_block_id, %rpc, "fork detected");
// todo!("handle equal by updating the cannonical chain");
self.save_block(&rpc_head_block, Some(true))?;
head_block_sender.send(heavy_block)?;
}
@ -427,15 +433,20 @@ impl Web3Connections {
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
debug!("chain rolled back");
warn!(head=%heavy_block_id, %rpc, "chain rolled back");
self.save_block(&rpc_head_block, Some(true))?;
// todo!("handle less by removing higher blocks from the cannonical chain");
head_block_sender.send(heavy_block)?;
}
Ordering::Greater => {
debug!(heavy=%heavy_block_id, %rpc, "new head block");
debug!(head=%heavy_block_id, %rpc, "new block");
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
self.save_block(&rpc_head_block, Some(true))?;
head_block_sender.send(heavy_block)?;
}
}
@ -455,126 +466,6 @@ impl Web3Connections {
// TODO: log different things depending on old_synced_connections
}
return Ok(());
todo!("double check everything under this");
/*
let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit;
let num_synced_rpcs = heavy_rpcs.len() as u32;
let new_synced_connections = if soft_limit_met {
// we have a heavy large enough to serve traffic
let head_block_hash = highest_work_block.hash.unwrap();
let head_block_num = highest_work_block.number.unwrap();
if num_synced_rpcs < self.min_synced_rpcs {
// TODO: warn is too loud. if we are first starting, this is expected to happen
warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance");
None
} else {
// TODO: wait until at least most of the rpcs have given their initial block?
// otherwise, if there is a syncing node that is fast, our first head block might not be good
// TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine?
let conns = heavy_rpcs.into_iter().cloned().collect();
let head_block_id = BlockId {
hash: head_block_hash,
num: head_block_num,
};
let new_synced_connections = SyncedConnections {
head_block_id: Some(head_block_id),
conns,
};
Some(new_synced_connections)
}
} else {
// failure even after checking parent heads!
// not enough servers are in sync to server traffic
// TODO: at startup this is fine, but later its a problem
None
};
if let Some(new_synced_connections) = new_synced_connections {
let heavy_block_id = new_synced_connections.head_block_id.clone();
let new_synced_connections = Arc::new(new_synced_connections);
let old_synced_connections = self.synced_connections.swap(new_synced_connections);
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
match (&old_synced_connections.head_block_id, &heavy_block_id) {
(None, None) => warn!("no servers synced"),
(None, Some(heavy_block_id)) => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
}
(Some(_), None) => warn!("no longer synced!"),
(Some(old_block_id), Some(heavy_block_id)) => {
debug_assert_ne!(heavy_block_id.num, U64::zero());
match heavy_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
// multiple blocks with the same fork!
debug!("fork detected");
todo!("handle equal");
}
Ordering::Less => {
// this seems unlikely
warn!("chain rolled back");
todo!("handle less");
}
Ordering::Greater => {
info!(heavy=%heavy_block_id, %rpc, "new head block");
todo!("handle greater");
}
}
}
}
} else {
todo!()
}
*/
/*
if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() {
// this is fine. we have our first hash
} else if rpc_head_block.hash.is_some()
&& old_synced_connections.head_block_id.is_some()
&& old_synced_connections
.head_block_id
.as_ref()
.map_ok(|x| x.num)
!= rpc_head_block.hash
{
info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), heavy=?heavy_block_id, %rpc, "non heavy head");
// TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time
// TODO: if there is any non-heavy head log how many nodes are on it
} */
/*
if heavy_block_num == U64::zero {
warn!(?soft_limit_met, %heavy_block_hash, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
} else if heavy_block_hash == old_head_hash {
debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "cur heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
} else if soft_limit_met {
// TODO: if new's parent is not old, warn?
debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "NEW heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
// the head hash changed. forward to any subscribers
head_block_sender.send(highest_work_block)?;
// TODO: do something with pending_tx_sender
} else {
// TODO: i don't think we can get here
warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
}
*/
Ok(())
}
}

@ -251,7 +251,7 @@ impl Web3Connections {
pending_tx_sender,
)
.await
});
})?;
futures.push(flatten_handle(handle));
}
@ -264,7 +264,7 @@ impl Web3Connections {
sleep(Duration::from_secs(600)).await;
// TODO: "every interval, check that the provider is still connected"
}
});
})?;
futures.push(flatten_handle(handle));
}