improve redis connection pool

This commit is contained in:
Bryan Stitt 2022-07-08 23:02:32 +00:00
parent 4080209eeb
commit efee5c83fc
6 changed files with 50 additions and 63 deletions

View File

@ -37,6 +37,7 @@
- [x] incoming rate limiting (by ip)
- [x] connection pool for redis
- [ ] automatically route to archive server when necessary
- originally, no processing was done to params; they were just serde_json::RawValue. this is probably fastest, but we need to look for "latest" and count elements, so we have to use serde_json::Value
- [ ] handle log subscriptions
- [ ] basic request method stats
- [x] http servers should check block at the very start

View File

@ -1,5 +1,6 @@
use bb8_redis::redis::cmd;
pub use bb8_redis::redis::RedisError;
pub use bb8_redis::{bb8, RedisConnectionManager};
use std::time::Duration;

View File

@ -10,6 +10,7 @@ use futures::stream::StreamExt;
use futures::Future;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use redis_cell_client::bb8::ErrorSink;
use redis_cell_client::{bb8, RedisCellClient, RedisConnectionManager};
use serde_json::json;
use std::fmt;
@ -23,7 +24,8 @@ use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{info, info_span, instrument, trace, warn, Instrument};
use crate::config::Web3ConnectionConfig;
use crate::bb8_helpers;
use crate::config::AppConfig;
use crate::connections::Web3Connections;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
@ -37,7 +39,7 @@ static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"),
);
// TODO: put this in config? what size should we do?
// TODO: put this in config? what size should we do? probably should structure this to be a size in MB
const RESPONSE_CACHE_CAP: usize = 1024;
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
@ -114,21 +116,27 @@ impl Web3ProxyApp {
self.public_rate_limiter.as_ref()
}
// TODO: should we just take the rpc config as the only arg instead?
pub async fn spawn(
chain_id: usize,
redis_address: Option<String>,
balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>,
public_rate_limit_per_minute: u32,
app_config: AppConfig,
num_workers: usize,
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
// TODO: try_join_all instead
let balanced_rpcs = app_config.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs {
private_rpcs.into_values().collect()
} else {
vec![]
};
// TODO: try_join_all instead?
let handles = FuturesUnordered::new();
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: can we configure the connection pool? should we?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
let http_client = Some(
reqwest::ClientBuilder::new()
@ -138,12 +146,19 @@ impl Web3ProxyApp {
.build()?,
);
let rate_limiter_pool = match redis_address {
let rate_limiter_pool = match app_config.shared.rate_limit_redis {
Some(redis_address) => {
info!("Connecting to redis on {}", redis_address);
let manager = RedisConnectionManager::new(redis_address)?;
let pool = bb8::Pool::builder().build(manager).await?;
// TODO: what min?
let builder = bb8::Pool::builder()
.error_sink(bb8_helpers::RedisErrorSink.boxed_clone())
.max_size(num_workers as u32)
.min_idle(Some(1));
let pool = builder.build(manager).await?;
Some(pool)
}
@ -166,7 +181,7 @@ impl Web3ProxyApp {
// TODO: attach context to this error
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
chain_id,
app_config.shared.chain_id,
balanced_rpcs,
http_client.as_ref(),
rate_limiter_pool.as_ref(),
@ -184,7 +199,7 @@ impl Web3ProxyApp {
} else {
// TODO: attach context to this error
let (private_rpcs, private_handle) = Web3Connections::spawn(
chain_id,
app_config.shared.chain_id,
private_rpcs,
http_client.as_ref(),
rate_limiter_pool.as_ref(),
@ -205,14 +220,14 @@ impl Web3ProxyApp {
drop(pending_tx_receiver);
// TODO: how much should we allow?
let public_max_burst = public_rate_limit_per_minute / 3;
let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3;
let public_rate_limiter = rate_limiter_pool.as_ref().map(|redis_client_pool| {
RedisCellClient::new(
redis_client_pool.clone(),
"public".to_string(),
public_max_burst,
public_rate_limit_per_minute,
app_config.shared.public_rate_limit_per_minute,
60,
)
});

View File

@ -1,15 +1,12 @@
use argh::FromArgs;
use ethers::prelude::{Block, TxHash};
use futures::Future;
use serde::Deserialize;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::AnyhowJoinHandle;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
#[derive(Debug, FromArgs)]
/// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
@ -28,7 +25,7 @@ pub struct CliConfig {
}
#[derive(Debug, Deserialize)]
pub struct RpcConfig {
pub struct AppConfig {
pub shared: RpcSharedConfig,
pub balanced_rpcs: HashMap<String, Web3ConnectionConfig>,
pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>,
@ -52,34 +49,6 @@ pub struct Web3ConnectionConfig {
hard_limit: Option<u32>,
}
impl RpcConfig {
/// Create a Web3ProxyApp from config
// #[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn spawn(
self,
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = self.private_rpcs {
private_rpcs.into_values().collect()
} else {
vec![]
};
Web3ProxyApp::spawn(
self.shared.chain_id,
self.shared.rate_limit_redis,
balanced_rpcs,
private_rpcs,
self.shared.public_rate_limit_per_minute,
)
.await
}
}
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]

View File

@ -249,7 +249,7 @@ impl Web3Connection {
}
#[instrument(skip_all)]
async fn send_block(
async fn send_block_result(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>,
@ -372,7 +372,7 @@ impl Web3Connection {
last_hash = new_hash;
}
self.send_block(block, &block_sender).await?;
self.send_block_result(block, &block_sender).await?;
}
Err(err) => {
warn!(?err, "Rate limited on latest block from {}", self);
@ -401,12 +401,10 @@ impl Web3Connection {
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender).await?;
self.send_block_result(block, &block_sender).await?;
// TODO: should the stream have a timeout on it here?
// TODO: although reconnects will make this less of an issue
while let Some(new_block) = stream.next().await {
self.send_block(Ok(new_block), &block_sender).await?;
self.send_block_result(Ok(new_block), &block_sender).await?;
}
warn!(?self, "subscription ended");
@ -455,15 +453,11 @@ impl Web3Connection {
}
}
Web3Provider::Ws(provider) => {
// rate limits
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_pending_txs().await?;
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
drop(active_request_handle);
while let Some(pending_tx_id) = stream.next().await {
@ -484,7 +478,7 @@ impl Web3Connection {
/// be careful with this; it will wait forever!
#[instrument(skip_all)]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// TODO: maximum wait time?
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
loop {
match self.try_request_handle().await {

View File

@ -1,6 +1,7 @@
#![forbid(unsafe_code)]
mod app;
mod bb8_helpers;
mod config;
mod connection;
mod connections;
@ -13,11 +14,11 @@ use std::sync::atomic::{self, AtomicUsize};
use std::thread;
use std::time::Duration;
use tokio::runtime;
use tracing::{info, trace};
use tracing::{debug, info, trace};
use tracing_subscriber::EnvFilter;
use crate::app::{flatten_handle, Web3ProxyApp};
use crate::config::{CliConfig, RpcConfig};
use crate::config::{AppConfig, CliConfig};
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
@ -41,7 +42,7 @@ fn main() -> anyhow::Result<()> {
// advanced configuration
info!("Loading rpc config @ {}", cli_config.config);
let rpc_config: String = fs::read_to_string(cli_config.config)?;
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
let rpc_config: AppConfig = toml::from_str(&rpc_config)?;
trace!("rpc_config: {:?}", rpc_config);
@ -83,8 +84,14 @@ fn main() -> anyhow::Result<()> {
// start tokio's async runtime
let rt = rt_builder.build()?;
// we use this worker count to also set our redis connection pool size
// TODO: think about this more
let num_workers = rt.metrics().num_workers();
debug!(?num_workers);
rt.block_on(async {
let (app, app_handle) = rpc_config.spawn().await?;
let (app, app_handle) = Web3ProxyApp::spawn(rpc_config, num_workers).await?;
let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app));