From efee5c83fcae7f2d4b3819683b0f9082bc359166 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 8 Jul 2022 23:02:32 +0000 Subject: [PATCH] improve redis connection pool --- TODO.md | 1 + redis-cell-client/src/lib.rs | 1 + web3-proxy/src/app.rs | 45 ++++++++++++++++++++++++------------ web3-proxy/src/config.rs | 33 +------------------------- web3-proxy/src/connection.rs | 18 +++++---------- web3-proxy/src/main.rs | 15 ++++++++---- 6 files changed, 50 insertions(+), 63 deletions(-) diff --git a/TODO.md b/TODO.md index d1853310..3ea140ed 100644 --- a/TODO.md +++ b/TODO.md @@ -37,6 +37,7 @@ - [x] incoming rate limiting (by ip) - [x] connection pool for redis - [ ] automatically route to archive server when necessary + - originally, no processing was done to params; they were just serde_json::RawValue. this is probably fastest, but we need to look for "latest" and count elements, so we have to use serde_json::Value - [ ] handle log subscriptions - [ ] basic request method stats - [x] http servers should check block at the very start diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index 47a9da43..c7f05fc8 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -1,5 +1,6 @@ use bb8_redis::redis::cmd; +pub use bb8_redis::redis::RedisError; pub use bb8_redis::{bb8, RedisConnectionManager}; use std::time::Duration; diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 8f741ce4..f2a77fca 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -10,6 +10,7 @@ use futures::stream::StreamExt; use futures::Future; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; +use redis_cell_client::bb8::ErrorSink; use redis_cell_client::{bb8, RedisCellClient, RedisConnectionManager}; use serde_json::json; use std::fmt; @@ -23,7 +24,8 @@ use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{info, info_span, instrument, trace, warn, Instrument}; -use crate::config::Web3ConnectionConfig; +use crate::bb8_helpers; +use crate::config::AppConfig; use crate::connections::Web3Connections; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; @@ -37,7 +39,7 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// TODO: put this in config? what size should we do? +// TODO: put this in config? what size should we do? probably should structure this to be a size in MB const RESPONSE_CACHE_CAP: usize = 1024; /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work @@ -114,21 +116,27 @@ impl Web3ProxyApp { self.public_rate_limiter.as_ref() } + // TODO: should we just take the rpc config as the only arg instead? pub async fn spawn( - chain_id: usize, - redis_address: Option, - balanced_rpcs: Vec, - private_rpcs: Vec, - public_rate_limit_per_minute: u32, + app_config: AppConfig, + num_workers: usize, ) -> anyhow::Result<( Arc, Pin>>>, )> { - // TODO: try_join_all instead + let balanced_rpcs = app_config.balanced_rpcs.into_values().collect(); + + let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs { + private_rpcs.into_values().collect() + } else { + vec![] + }; + + // TODO: try_join_all instead? let handles = FuturesUnordered::new(); // make a http shared client - // TODO: how should we configure the connection pool? + // TODO: can we configure the connection pool? should we? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server let http_client = Some( reqwest::ClientBuilder::new() @@ -138,12 +146,19 @@ impl Web3ProxyApp { .build()?, ); - let rate_limiter_pool = match redis_address { + let rate_limiter_pool = match app_config.shared.rate_limit_redis { Some(redis_address) => { info!("Connecting to redis on {}", redis_address); let manager = RedisConnectionManager::new(redis_address)?; - let pool = bb8::Pool::builder().build(manager).await?; + + // TODO: what min? + let builder = bb8::Pool::builder() + .error_sink(bb8_helpers::RedisErrorSink.boxed_clone()) + .max_size(num_workers as u32) + .min_idle(Some(1)); + + let pool = builder.build(manager).await?; Some(pool) } @@ -166,7 +181,7 @@ impl Web3ProxyApp { // TODO: attach context to this error let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( - chain_id, + app_config.shared.chain_id, balanced_rpcs, http_client.as_ref(), rate_limiter_pool.as_ref(), @@ -184,7 +199,7 @@ impl Web3ProxyApp { } else { // TODO: attach context to this error let (private_rpcs, private_handle) = Web3Connections::spawn( - chain_id, + app_config.shared.chain_id, private_rpcs, http_client.as_ref(), rate_limiter_pool.as_ref(), @@ -205,14 +220,14 @@ impl Web3ProxyApp { drop(pending_tx_receiver); // TODO: how much should we allow? - let public_max_burst = public_rate_limit_per_minute / 3; + let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3; let public_rate_limiter = rate_limiter_pool.as_ref().map(|redis_client_pool| { RedisCellClient::new( redis_client_pool.clone(), "public".to_string(), public_max_burst, - public_rate_limit_per_minute, + app_config.shared.public_rate_limit_per_minute, 60, ) }); diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index de794954..c73ff52c 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -1,15 +1,12 @@ use argh::FromArgs; use ethers::prelude::{Block, TxHash}; -use futures::Future; use serde::Deserialize; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use tokio::sync::broadcast; use crate::app::AnyhowJoinHandle; use crate::connection::Web3Connection; -use crate::Web3ProxyApp; #[derive(Debug, FromArgs)] /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. @@ -28,7 +25,7 @@ pub struct CliConfig { } #[derive(Debug, Deserialize)] -pub struct RpcConfig { +pub struct AppConfig { pub shared: RpcSharedConfig, pub balanced_rpcs: HashMap, pub private_rpcs: Option>, @@ -52,34 +49,6 @@ pub struct Web3ConnectionConfig { hard_limit: Option, } -impl RpcConfig { - /// Create a Web3ProxyApp from config - // #[instrument(name = "try_build_RpcConfig", skip_all)] - pub async fn spawn( - self, - ) -> anyhow::Result<( - Arc, - Pin>>>, - )> { - let balanced_rpcs = self.balanced_rpcs.into_values().collect(); - - let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { - private_rpcs.into_values().collect() - } else { - vec![] - }; - - Web3ProxyApp::spawn( - self.shared.chain_id, - self.shared.rate_limit_redis, - balanced_rpcs, - private_rpcs, - self.shared.public_rate_limit_per_minute, - ) - .await - } -} - impl Web3ConnectionConfig { /// Create a Web3Connection from config // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 22377bbb..0e1a2fca 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -249,7 +249,7 @@ impl Web3Connection { } #[instrument(skip_all)] - async fn send_block( + async fn send_block_result( self: &Arc, block: Result, ProviderError>, block_sender: &flume::Sender<(Block, Arc)>, @@ -372,7 +372,7 @@ impl Web3Connection { last_hash = new_hash; } - self.send_block(block, &block_sender).await?; + self.send_block_result(block, &block_sender).await?; } Err(err) => { warn!(?err, "Rate limited on latest block from {}", self); @@ -401,12 +401,10 @@ impl Web3Connection { .request("eth_getBlockByNumber", ("latest", false)) .await; - self.send_block(block, &block_sender).await?; + self.send_block_result(block, &block_sender).await?; - // TODO: should the stream have a timeout on it here? - // TODO: although reconnects will make this less of an issue while let Some(new_block) = stream.next().await { - self.send_block(Ok(new_block), &block_sender).await?; + self.send_block_result(Ok(new_block), &block_sender).await?; } warn!(?self, "subscription ended"); @@ -455,15 +453,11 @@ impl Web3Connection { } } Web3Provider::Ws(provider) => { - // rate limits + // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self.wait_for_request_handle().await; - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? let mut stream = provider.subscribe_pending_txs().await?; - // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle drop(active_request_handle); while let Some(pending_tx_id) = stream.next().await { @@ -484,7 +478,7 @@ impl Web3Connection { /// be careful with this; it will wait forever! #[instrument(skip_all)] pub async fn wait_for_request_handle(self: &Arc) -> ActiveRequestHandle { - // TODO: maximum wait time? + // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { match self.try_request_handle().await { diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 2c704ec0..a1e94cc6 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -1,6 +1,7 @@ #![forbid(unsafe_code)] mod app; +mod bb8_helpers; mod config; mod connection; mod connections; @@ -13,11 +14,11 @@ use std::sync::atomic::{self, AtomicUsize}; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::{info, trace}; +use tracing::{debug, info, trace}; use tracing_subscriber::EnvFilter; use crate::app::{flatten_handle, Web3ProxyApp}; -use crate::config::{CliConfig, RpcConfig}; +use crate::config::{AppConfig, CliConfig}; fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default @@ -41,7 +42,7 @@ fn main() -> anyhow::Result<()> { // advanced configuration info!("Loading rpc config @ {}", cli_config.config); let rpc_config: String = fs::read_to_string(cli_config.config)?; - let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; + let rpc_config: AppConfig = toml::from_str(&rpc_config)?; trace!("rpc_config: {:?}", rpc_config); @@ -83,8 +84,14 @@ fn main() -> anyhow::Result<()> { // start tokio's async runtime let rt = rt_builder.build()?; + + // we use this worker count to also set our redis connection pool size + // TODO: think about this more + let num_workers = rt.metrics().num_workers(); + debug!(?num_workers); + rt.block_on(async { - let (app, app_handle) = rpc_config.spawn().await?; + let (app, app_handle) = Web3ProxyApp::spawn(rpc_config, num_workers).await?; let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app));