refactors to make configs partially reloadable

This commit is contained in:
Bryan Stitt 2023-02-25 23:52:33 -08:00
parent dd9233d89b
commit e96f09a9c4
8 changed files with 312 additions and 300 deletions

@ -676,3 +676,4 @@ in another repo: event subscriber
- [ ] have an upgrade tier that queries multiple backends at once. returns on first Ok result, collects errors. if no Ok, find the most common error and then respond with that
- [ ] give public_recent_ips_salt a better, more general, name
- [ ] include tier in the head block logs?
- [ ] i think i use FuturesUnordered when a try_join_all might be better

@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::blockchain::{Web3ProxyBlock};
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::transactions::TxStatus;
@ -199,6 +199,7 @@ impl DatabaseReplica {
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
pub http_client: Option<reqwest::Client>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Option<Arc<Web3Rpcs>>,
response_cache: ResponseCache,
@ -354,6 +355,8 @@ pub async fn get_migrated_db(
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
/// handles for the balanced and private rpcs
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
}
@ -365,6 +368,29 @@ impl Web3ProxyApp {
num_workers: usize,
shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
);
}
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
@ -491,6 +517,47 @@ impl Web3ProxyApp {
.build()?,
);
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
&format!("web3_proxy:{}", top_config.app.chain_id),
"frontend",
public_requests_per_period,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rpc_rrl.clone(),
None,
));
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
10_000, "key", rpc_rrl, None,
));
}
// login rate limiter
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_period,
60.0,
redis_pool.clone(),
));
}
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
@ -512,20 +579,6 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// keep 1GB/5 minutes of blocks in the cache
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better?
let block_map: BlockHashesCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &Web3ProxyBlock| {
// TODO: is this good enough?
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
@ -567,10 +620,10 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// set up a Web3Rpcs object to hold all our connections
// TODO: for now, only the server_configs can change
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
block_map.clone(),
let app_handles = FuturesUnordered::new();
// prepare a Web3Rpcs to hold all our balanced connections
let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
@ -580,25 +633,21 @@ impl Web3ProxyApp {
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
vredis_pool.clone(),
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
// connect to the load balanced rpcs
balanced_rpcs.apply_server_configs(top_config.balanced_rpcs);
app_handles.push(balanced_rpcs_handle);
// connect to the private rpcs
// prepare a Web3Rpcs to hold all our private connections
// only some chains have this, so this is optional
let private_rpc_configs = top_config.private_rpcs.unwrap_or_default();
let private_rpcs = if private_rpc_configs.is_empty() {
let private_rpcs = if top_config.private_rpcs.is_none() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
} else {
// TODO: do something with the spawn handle
let (private_rpcs, _) = Web3Rpcs::spawn(
block_map,
let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
@ -610,7 +659,6 @@ impl Web3ProxyApp {
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
vredis_pool.clone(),
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
@ -620,18 +668,15 @@ impl Web3ProxyApp {
.await
.context("spawning private_rpcs")?;
private_rpcs.apply_server_configs(private_rpc_configs);
app_handles.push(private_rpcs_handle);
if private_rpcs.by_name.is_empty() {
None
} else {
Some(private_rpcs)
}
Some(private_rpcs)
};
let app = Self {
config: top_config.app,
config: top_config.app.clone(),
balanced_rpcs,
http_client,
private_rpcs,
response_cache,
watch_consensus_head_receiver,
@ -652,103 +697,30 @@ impl Web3ProxyApp {
let app = Arc::new(app);
app.apply_config(top_config).await?;
app.apply_top_config(top_config).await?;
// TODO: use channel for receiving new top_configs
// TODO: return a channel for sending new top_configs
Ok((app, important_background_handles).into())
Ok((app, app_handles, important_background_handles).into())
}
/// update the app's balanced_rpcs and private_rpcs
/// TODO: make more of the app mutable. for now, db and
pub async fn apply_server_configs(
self: Arc<Self>,
top_config: TopConfig,
) -> anyhow::Result<()> {
// safety checks on the config
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
);
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
// TODO: also update self.config from new_top_config.app
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
// connect to the backends
self.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs)
.await?;
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
let balanced_rpcs = top_config.balanced_rpcs;
// safety check on balanced_rpcs
if balanced_rpcs.len() < top_config.app.min_synced_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.",
balanced_rpcs.len(),
top_config.app.min_synced_rpcs
));
}
// safety check on sum soft limit
let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit);
if sum_soft_limit < top_config.app.min_sum_soft_limit {
return Err(anyhow::anyhow!(
"Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
top_config.app.min_sum_soft_limit
));
}
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
&format!("web3_proxy:{}", top_config.app.chain_id),
"frontend",
public_requests_per_period,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rpc_rrl.clone(),
None,
));
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
10_000, "key", rpc_rrl, None,
));
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
if let Some(private_rpcs) = self.private_rpcs.as_ref() {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
.await?;
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo!("handle toggling private_rpcs")
}
// login rate limiter
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_period,
60.0,
redis_pool.clone(),
));
}
Ok(())

@ -1,5 +1,5 @@
use crate::app::AnyhowJoinHandle;
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use argh::FromArgs;
use ethers::prelude::TxHash;
@ -253,7 +253,7 @@ impl Web3RpcConfig {
chain_id: u64,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesCache,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
@ -270,7 +270,7 @@ impl Web3RpcConfig {
http_client,
http_interval_sender,
redis_pool,
block_map,
blocks_by_hash_cache,
block_sender,
tx_id_sender,
reconnect,

@ -15,13 +15,13 @@ use serde::Serialize;
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tokio::sync::broadcast;
use tokio::time::Duration;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
/// A block and its age.
#[derive(Clone, Debug, Default, From, Serialize)]
@ -153,13 +153,13 @@ impl Web3Rpcs {
// this is the only place that writes to block_numbers
// multiple inserts should be okay though
// TODO: info that there was a fork?
self.block_numbers.insert(*block_num, *block_hash).await;
self.blocks_by_number.insert(*block_num, *block_hash).await;
}
// this block is very likely already in block_hashes
// TODO: use their get_with
let block = self
.block_hashes
.blocks_by_hash
.get_with(*block_hash, async move { block.clone() })
.await;
@ -178,7 +178,7 @@ impl Web3Rpcs {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
if let Some(block) = self.block_hashes.get(hash) {
if let Some(block) = self.blocks_by_hash.get(hash) {
return Ok(block);
}
@ -265,10 +265,10 @@ impl Web3Rpcs {
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
let mut consensus_head_receiver = self
.watch_consensus_head_receiver
.watch_consensus_head_sender
.as_ref()
.context("need new head subscriptions to fetch cannonical_block")?
.clone();
.subscribe();
// be sure the requested block num exists
// TODO: is this okay? what if we aren't synced?!
@ -295,7 +295,7 @@ impl Web3Rpcs {
// try to get the hash from our cache
// deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) {
if let Some(block_hash) = self.blocks_by_number.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorization through here?
let block = self.block(authorization, &block_hash, None).await?;
@ -337,7 +337,6 @@ impl Web3Rpcs {
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
head_block_sender: watch::Sender<Option<Web3ProxyBlock>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
@ -364,7 +363,6 @@ impl Web3Rpcs {
&mut connection_heads,
new_block,
rpc,
&head_block_sender,
&pending_tx_sender,
)
.await
@ -389,7 +387,6 @@ impl Web3Rpcs {
consensus_finder: &mut ConsensusFinder,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
head_block_sender: &watch::Sender<Option<Web3ProxyBlock>>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
@ -415,6 +412,7 @@ impl Web3Rpcs {
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
let consensus_tier = new_synced_connections.tier;
let total_tiers = consensus_finder.len();
let backups_needed = new_synced_connections.backups_needed;
@ -456,9 +454,11 @@ impl Web3Rpcs {
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
.context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
}
Some(old_head_block) => {
// TODO: do this log item better
@ -470,7 +470,7 @@ impl Web3Rpcs {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
@ -510,9 +510,9 @@ impl Web3Rpcs {
.await
.context("save consensus_head_block as heaviest chain")?;
head_block_sender
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
.context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
}
}
Ordering::Less => {
@ -545,9 +545,9 @@ impl Web3Rpcs {
"save_block sending consensus_head_block as heaviest chain",
)?;
head_block_sender
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
.context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
@ -571,7 +571,7 @@ impl Web3Rpcs {
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender.send(Some(consensus_head_block))?;
watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
}
}

@ -53,7 +53,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_receiver
self.watch_consensus_head_sender
.as_ref()
.and_then(|x| x.borrow().clone())
}

@ -1,9 +1,9 @@
///! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlockHashesCache, Web3ProxyBlock};
use super::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use super::consensus::ConsensusWeb3Rpcs;
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode;
@ -38,19 +38,27 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Rpcs {
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: HashMap<String, Arc<Web3Rpc>>,
pub(crate) http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
/// Geth's subscriptions have the same potential for skipping blocks.
pub(super) watch_consensus_rpcs_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<Option<Web3ProxyBlock>>>,
pub(super) pending_transactions:
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) pending_transaction_cache:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) block_hashes: BlockHashesCache,
pub(super) blocks_by_hash: BlocksByHashCache,
/// blocks on the heaviest chain
pub(super) block_numbers: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) blocks_by_number: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
/// the number of rpcs required to agree on consensus for the head block (thundering herd protection)
pub(super) min_head_rpcs: usize,
/// the soft limit required to agree on consensus for the head block. (thundering herd protection)
@ -62,95 +70,9 @@ pub struct Web3Rpcs {
}
impl Web3Rpcs {
pub async fn min_head_rpcs(&self) -> usize {
self.min_head_rpcs
}
pub async fn apply_server_configs(
&self,
server_configs: HashMap<String, Web3RpcConfig>,
) -> anyhow::Result<()> {
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = server_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
if server_config.disabled {
info!("{} is disabled", server_name);
return None;
}
let db_conn = db_conn.clone();
let http_client = http_client.clone();
let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = if watch_consensus_head_sender.is_some() {
Some(block_sender.clone())
} else {
None
};
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
debug!("spawning {}", server_name);
let handle = tokio::spawn(async move {
server_config
.spawn(
server_name,
db_conn,
redis_pool,
chain_id,
http_client,
http_interval_sender,
block_map,
block_sender,
pending_tx_id_sender,
true,
)
.await
});
Some(handle)
})
.collect();
// map of connection names to their connection
let mut connections = AsyncRwLock::new(HashMap::new());
let mut handles = vec![];
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, _handle))) => {
// web3 connection worked
connections
.write()
.await
.insert(connection.name.clone(), connection);
// TODO: what should we do with the handle? at least log any errors
}
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into());
}
}
}
Ok(())
}
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
block_map: BlockHashesCache,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
http_client: Option<reqwest::Client>,
@ -158,9 +80,8 @@ impl Web3Rpcs {
max_block_lag: Option<U64>,
min_head_rpcs: usize,
min_sum_soft_limit: u32,
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_transaction_cache: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
@ -217,30 +138,40 @@ impl Web3Rpcs {
None
};
// TODO: max_capacity and time_to_idle from config
// all block hashes are the same size, so no need for weigher
let block_hashes = Cache::builder()
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better? need to know actual allocated size
// TODO: limits from config
let blocks_by_hash: BlocksByHashCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &Web3ProxyBlock| {
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
.time_to_idle(Duration::from_secs(600))
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all block numbers are the same size, so no need for weigher
let block_numbers = Cache::builder()
// TODO: limits from config
let blocks_by_number = Cache::builder()
.time_to_idle(Duration::from_secs(600))
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
let watch_consensus_head_receiver =
watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
// by_name starts empty. self.apply_server_configs will add to it
let by_name = Default::default();
let connections = Arc::new(Self {
by_name: connections,
watch_consensus_rpcs_sender: watch_consensus_connections_sender,
watch_consensus_head_receiver,
pending_transactions,
block_hashes,
block_numbers,
block_sender,
by_name,
http_interval_sender,
watch_consensus_rpcs_sender,
watch_consensus_head_sender,
pending_transaction_cache,
pending_tx_id_sender,
pending_tx_id_receiver,
blocks_by_hash,
blocks_by_number,
min_sum_soft_limit,
min_head_rpcs,
max_block_age,
@ -254,13 +185,7 @@ impl Web3Rpcs {
tokio::spawn(async move {
connections
.subscribe(
authorization,
pending_tx_id_receiver,
block_receiver,
watch_consensus_head_sender,
pending_tx_sender,
)
.subscribe(authorization, block_receiver, pending_tx_sender)
.await
})
};
@ -268,19 +193,131 @@ impl Web3Rpcs {
Ok((connections, handle))
}
/// update the rpcs in this group
pub async fn apply_server_configs(
&self,
app: &Web3ProxyApp,
rpc_configs: HashMap<String, Web3RpcConfig>,
) -> anyhow::Result<()> {
// safety checks
if rpc_configs.len() < app.config.min_synced_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} rpcs! Add more rpcs or reduce min_synced_rpcs.",
rpc_configs.len(),
app.config.min_synced_rpcs
));
}
// safety check on sum soft limit
// TODO: will need to think about this more once sum_soft_limit is dynamic
let sum_soft_limit = rpc_configs.values().fold(0, |acc, x| acc + x.soft_limit);
// TODO: < is a bit dangerous, we should require a buffer
if sum_soft_limit < self.min_sum_soft_limit {
return Err(anyhow::anyhow!(
"Only {}/{} soft limit! Add more rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
self.min_sum_soft_limit
));
}
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
if server_config.disabled {
info!("{} is disabled", server_name);
return None;
}
let db_conn = app.db_conn();
let http_client = app.http_client.clone();
let vredis_pool = app.vredis_pool.clone();
let block_sender = if self.watch_consensus_head_sender.is_some() {
Some(self.block_sender.clone())
} else {
None
};
let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone());
let blocks_by_hash = self.blocks_by_hash.clone();
let http_interval_sender = self.http_interval_sender.clone();
let chain_id = app.config.chain_id;
debug!("spawning {}", server_name);
let handle = tokio::spawn(async move {
server_config
.spawn(
server_name,
db_conn,
vredis_pool,
chain_id,
http_client,
http_interval_sender,
blocks_by_hash,
block_sender,
pending_tx_id_sender,
true,
)
.await
});
Some(handle)
})
.collect();
// map of connection names to their connection
let connections = AsyncRwLock::new(HashMap::new());
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, _handle))) => {
// web3 connection worked
let old_rpc = connections
.write()
.await
.insert(connection.name.clone(), connection);
if let Some(old_rpc) = old_rpc {
todo!("do something to make the old one shutdown");
}
// TODO: what should we do with the new handle? make sure error logs aren't dropped
}
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
// TODO: will this retry automatically? i don't think so
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into());
}
}
}
Ok(())
}
pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Rpc>> {
self.by_name.get(conn_name)
}
pub async fn min_head_rpcs(&self) -> usize {
self.min_head_rpcs
}
/// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Rpc`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe(
self: Arc<Self>,
authorization: Arc<Authorization>,
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -292,6 +329,7 @@ impl Web3Rpcs {
if let Some(pending_tx_sender) = pending_tx_sender.clone() {
let clone = self.clone();
let authorization = authorization.clone();
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
@ -311,7 +349,7 @@ impl Web3Rpcs {
}
// setup the block funnel
if let Some(head_block_sender) = head_block_sender {
if self.watch_consensus_head_sender.is_some() {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
@ -319,12 +357,7 @@ impl Web3Rpcs {
.name("process_incoming_blocks")
.spawn(async move {
connections
.process_incoming_blocks(
&authorization,
block_receiver,
head_block_sender,
pending_tx_sender,
)
.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender)
.await
})?;
@ -1148,12 +1181,12 @@ impl Serialize for Web3Rpcs {
state.serialize_field("synced_connections", &consensus_connections)?;
}
self.block_hashes.sync();
self.block_numbers.sync();
state.serialize_field("block_hashes_count", &self.block_hashes.entry_count())?;
state.serialize_field("block_hashes_size", &self.block_hashes.weighted_size())?;
state.serialize_field("block_numbers_count", &self.block_numbers.entry_count())?;
state.serialize_field("block_numbers_size", &self.block_numbers.weighted_size())?;
self.blocks_by_hash.sync();
self.blocks_by_number.sync();
state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?;
state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?;
state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?;
state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?;
state.end()
}
}
@ -1346,18 +1379,25 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
let (watch_consensus_head_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: rpcs_by_name,
watch_consensus_head_receiver: None,
http_interval_sender: None,
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
pending_transaction_cache: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_hashes: Cache::builder()
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_numbers: Cache::builder()
blocks_by_number: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
// TODO: test max_block_age?
max_block_age: None,
@ -1369,7 +1409,6 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
// process None so that
@ -1378,7 +1417,6 @@ mod tests {
&mut consensus_finder,
None,
lagged_rpc.clone(),
&head_block_sender,
&None,
)
.await
@ -1388,7 +1426,6 @@ mod tests {
&mut consensus_finder,
None,
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
@ -1424,7 +1461,6 @@ mod tests {
&mut consensus_finder,
Some(lagged_block.clone()),
lagged_rpc,
&head_block_sender,
&None,
)
.await
@ -1434,7 +1470,6 @@ mod tests {
&mut consensus_finder,
Some(lagged_block.clone()),
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
@ -1450,7 +1485,6 @@ mod tests {
&mut consensus_finder,
Some(head_block.clone()),
head_rpc,
&head_block_sender,
&None,
)
.await
@ -1545,18 +1579,26 @@ mod tests {
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
let (watch_consensus_head_sender, _watch_consensus_head_receiver) =
watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: rpcs_by_name,
watch_consensus_head_receiver: None,
http_interval_sender: None,
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
pending_transaction_cache: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_hashes: Cache::builder()
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_numbers: Cache::builder()
blocks_by_number: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
min_head_rpcs: 1,
min_sum_soft_limit: 4_000,
@ -1566,7 +1608,6 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
// min sum soft limit will require tier 2
@ -1575,7 +1616,6 @@ mod tests {
&mut connection_heads,
Some(head_block.clone()),
pruned_rpc.clone(),
&head_block_sender,
&None,
)
.await
@ -1586,7 +1626,6 @@ mod tests {
&mut connection_heads,
Some(head_block.clone()),
archive_rpc.clone(),
&head_block_sender,
&None,
)
.await

@ -1,5 +1,5 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesCache, Web3ProxyBlock};
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -151,7 +151,7 @@ impl Web3Rpc {
redis_pool: Option<RedisPool>,
// TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough
// soft_limit: u32,
block_map: BlockHashesCache,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
@ -571,7 +571,7 @@ impl Web3Rpc {
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
let new_head_block = match new_head_block {
Ok(None) => {
@ -648,7 +648,7 @@ impl Web3Rpc {
async fn subscribe(
self: Arc<Self>,
authorization: &Arc<Authorization>,
block_map: BlockHashesCache,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
@ -848,7 +848,7 @@ impl Web3Rpc {
authorization: Arc<Authorization>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);

@ -82,7 +82,7 @@ impl Web3Rpcs {
}
// trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transactions.contains_key(&pending_tx_id) {
if self.pending_transaction_cache.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());
}