add thread for config file watching and run clippy lint

This commit is contained in:
Bryan Stitt 2023-02-26 22:44:09 -08:00
parent bf79d677b0
commit f8f5e7a1c8
12 changed files with 356 additions and 306 deletions

11
Cargo.lock generated

@ -2972,6 +2972,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "notify-debouncer-mini"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e23e9fa24f094b143c1eb61f90ac6457de87be6987bc70746e0179f7dbc9007b"
dependencies = [
"notify",
]
[[package]]
name = "num"
version = "0.4.0"
@ -5826,7 +5835,7 @@ dependencies = [
"log",
"migration",
"moka",
"notify",
"notify-debouncer-mini",
"num",
"num-traits",
"once_cell",

@ -49,7 +49,7 @@ ipnet = "2.7.1"
itertools = "0.10.5"
log = "0.4.17"
moka = { version = "0.10.0", default-features = false, features = ["future"] }
notify = "5.1.0"
notify-debouncer-mini = { version = "0.2.0", default-features = false }
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.17.1" }

@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{Web3ProxyBlock};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::transactions::TxStatus;
@ -26,7 +26,7 @@ use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::future::{join_all, pending};
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
@ -37,18 +37,20 @@ use migration::sea_orm::{
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use notify_debouncer_mini::{new_debouncer, notify, DebounceEventResult};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
use serde_json::value::to_raw_value;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{atomic, Arc};
use std::time::Duration;
use std::{fmt, fs};
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
@ -365,11 +367,13 @@ impl Web3ProxyApp {
/// The main entrypoint.
pub async fn spawn(
top_config: TopConfig,
top_config_path: Option<PathBuf>,
num_workers: usize,
shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
// TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
@ -391,13 +395,15 @@ impl Web3ProxyApp {
);
}
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
let app_handles = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
// connect to the database and make sure the latest migrations have run
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
// connect to mysql and make sure the latest migrations have run
if let Some(db_url) = top_config.app.db_url.clone() {
let db_min_connections = top_config
.app
@ -620,8 +626,6 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let app_handles = FuturesUnordered::new();
// prepare a Web3Rpcs to hold all our balanced connections
let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn(
top_config.app.chain_id,
@ -697,9 +701,64 @@ impl Web3ProxyApp {
let app = Arc::new(app);
app.apply_top_config(top_config).await?;
// TODO: use channel for receiving new top_configs
// TODO: return a channel for sending new top_configs
// watch for config changes
// TODO: initial config reload should be from this channel. not from the call to spawn
if let Some(top_config_path) = top_config_path {
let (top_config_sender, mut top_config_receiver) = watch::channel(top_config);
// TODO: i think the debouncer is exiting
let mut debouncer = new_debouncer(
Duration::from_secs(2),
None,
move |res: DebounceEventResult| match res {
Ok(events) => events.iter().for_each(|e| {
debug!("Event {:?} for {:?}", e.kind, e.path);
// TODO: use tokio::fs here?
let new_top_config: String = fs::read_to_string(&e.path).unwrap();
let new_top_config: TopConfig = toml::from_str(&new_top_config).unwrap();
top_config_sender.send_replace(new_top_config);
}),
Err(errors) => errors
.iter()
.for_each(|e| error!("config watcher error {:#?}", e)),
},
)
.context("failed starting debouncer config watcher")?;
// Add a path to be watched. All files and directories at that path and below will be monitored for changes.
info!("watching config @ {}", top_config_path.display());
debouncer
.watcher()
.watch(top_config_path.as_path(), notify::RecursiveMode::Recursive)
.context("failed starting config watcher")?;
let app = app.clone();
let config_handle = tokio::spawn(async move {
loop {
let new_top_config = top_config_receiver.borrow_and_update().to_owned();
app.apply_top_config(new_top_config)
.await
.context("failed applying new top_config")?;
top_config_receiver
.changed()
.await
.context("failed awaiting top_config change")?;
info!("config changed");
}
});
app_handles.push(config_handle);
} else {
// no path to config, so we don't know what to watch
// this isn't an error. the config might just be in memory
app.apply_top_config(top_config).await?;
}
Ok((app, app_handles, important_background_handles).into())
}

@ -6,11 +6,11 @@ mod check_config;
mod count_users;
mod create_key;
mod create_user;
mod daemon;
mod drop_migration_lock;
mod list_user_tier;
mod pagerduty;
mod popularity_contest;
mod proxyd;
mod rpc_accounting;
mod sentryd;
mod transfer_key;
@ -80,7 +80,7 @@ enum SubCommand {
DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand),
Pagerduty(pagerduty::PagerdutySubCommand),
PopularityContest(popularity_contest::PopularityContestSubCommand),
Proxyd(daemon::ProxydSubCommand),
Proxyd(proxyd::ProxydSubCommand),
RpcAccounting(rpc_accounting::RpcAccountingSubCommand),
Sentryd(sentryd::SentrydSubCommand),
TransferKey(transfer_key::TransferKeySubCommand),
@ -92,6 +92,9 @@ enum SubCommand {
}
fn main() -> anyhow::Result<()> {
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();
#[cfg(feature = "deadlock")]
{
// spawn a thread for deadlock detection
@ -142,9 +145,6 @@ fn main() -> anyhow::Result<()> {
.join(","),
};
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();
let mut cli_config: Web3ProxyCli = argh::from_env();
if cli_config.config.is_none() && cli_config.db_url.is_none() && cli_config.sentry_url.is_none()
@ -154,12 +154,13 @@ fn main() -> anyhow::Result<()> {
cli_config.config = Some("./config/development.toml".to_string());
}
let top_config = if let Some(top_config_path) = cli_config.config.clone() {
let (top_config, top_config_path) = if let Some(top_config_path) = cli_config.config.clone() {
let top_config_path = Path::new(&top_config_path)
.canonicalize()
.context(format!("checking for config at {}", top_config_path))?;
let top_config: String = fs::read_to_string(top_config_path)?;
let top_config: String = fs::read_to_string(top_config_path.clone())?;
let mut top_config: TopConfig = toml::from_str(&top_config)?;
// TODO: this doesn't seem to do anything
@ -184,9 +185,9 @@ fn main() -> anyhow::Result<()> {
}
}
Some(top_config)
(Some(top_config), Some(top_config_path))
} else {
None
(None, None)
};
let logger = env_logger::builder().parse_filters(&rust_log).build();
@ -343,8 +344,10 @@ fn main() -> anyhow::Result<()> {
}
SubCommand::Proxyd(x) => {
let top_config = top_config.expect("--config is required to run proxyd");
let top_config_path =
top_config_path.expect("path must be set if top_config exists");
x.main(top_config, num_workers).await
x.main(top_config, top_config_path, num_workers).await
}
SubCommand::DropMigrationLock(x) => {
let db_url = cli_config

@ -1,5 +1,7 @@
#![forbid(unsafe_code)]
use std::path::PathBuf;
use argh::FromArgs;
use futures::StreamExt;
use log::{error, info, warn};
@ -24,11 +26,19 @@ pub struct ProxydSubCommand {
}
impl ProxydSubCommand {
pub async fn main(self, top_config: TopConfig, num_workers: usize) -> anyhow::Result<()> {
pub async fn main(
self,
top_config: TopConfig,
top_config_path: PathBuf,
num_workers: usize,
) -> anyhow::Result<()> {
let (shutdown_sender, _) = broadcast::channel(1);
// TODO: i think there is a small race. if config_path changes
run(
top_config,
Some(top_config_path),
self.port,
self.prometheus_port,
num_workers,
@ -40,6 +50,7 @@ impl ProxydSubCommand {
async fn run(
top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: u16,
prometheus_port: u16,
num_workers: usize,
@ -54,8 +65,13 @@ async fn run(
let mut shutdown_receiver = shutdown_sender.subscribe();
// start the main app
let mut spawned_app =
Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?;
let mut spawned_app = Web3ProxyApp::spawn(
top_config,
top_config_path,
num_workers,
shutdown_sender.subscribe(),
)
.await?;
// start the prometheus metrics port
let prometheus_handle = tokio::spawn(metrics_frontend::serve(
@ -246,6 +262,7 @@ mod tests {
tokio::spawn(async move {
run(
top_config,
None,
frontend_port,
prometheus_port,
2,

@ -97,18 +97,18 @@ impl SentrydSubCommand {
.or_else(|| top_config.map(|x| x.app.chain_id))
.context("--config or --chain-id required")?;
let primary_proxy = self.web3_proxy.trim_end_matches("/").to_string();
let primary_proxy = self.web3_proxy.trim_end_matches('/').to_string();
let other_proxy: Vec<_> = self
.other_proxy
.into_iter()
.map(|x| x.trim_end_matches("/").to_string())
.map(|x| x.trim_end_matches('/').to_string())
.collect();
let other_rpc: Vec<_> = self
.other_rpc
.into_iter()
.map(|x| x.trim_end_matches("/").to_string())
.map(|x| x.trim_end_matches('/').to_string())
.collect();
let seconds = self.seconds.unwrap_or(60);

@ -211,6 +211,7 @@ impl JsonRpcForwardedResponse {
error: Some(JsonRpcErrorData {
code: code.unwrap_or(-32099),
message,
// TODO: accept data as an argument
data: None,
}),
}

@ -124,7 +124,7 @@ pub fn pagerduty_alert_for_config<T: Serialize>(
) -> AlertTrigger<T> {
let chain_id = top_config.app.chain_id;
let client_url = top_config.app.redirect_public_url.clone();
let client_url = top_config.app.redirect_public_url;
pagerduty_alert(
Some(chain_id),
@ -140,6 +140,7 @@ pub fn pagerduty_alert_for_config<T: Serialize>(
)
}
#[allow(clippy::too_many_arguments)]
pub fn pagerduty_alert<T: Serialize>(
chain_id: Option<u64>,
class: Option<String>,
@ -186,6 +187,6 @@ pub fn pagerduty_alert<T: Serialize>(
images: None,
links: None,
client: Some(client),
client_url: client_url,
client_url,
}
}

@ -8,7 +8,7 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::{anyhow, Context};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, error, trace, warn, Level};
use log::{debug, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use serde_json::json;
@ -72,7 +72,7 @@ impl Web3ProxyBlock {
if block_timestamp < now {
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
(now - block_timestamp).as_secs() as u64
(now - block_timestamp).as_secs()
} else {
0
}
@ -392,8 +392,7 @@ impl Web3Rpcs {
.await
.context("no consensus head block!")
.map_err(|err| {
self.watch_consensus_rpcs_sender
.send_replace(Arc::new(Default::default()));
self.watch_consensus_rpcs_sender.send_replace(None);
err
})?;
@ -414,100 +413,73 @@ impl Web3Rpcs {
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
.send_replace(Arc::new(new_synced_connections));
.send_replace(Some(Arc::new(new_synced_connections)));
let backups_voted_str = if backups_needed { "B " } else { "" };
if let Some(consensus_head_block) = consensus_head_block {
match &old_consensus_head_connections.head_block {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
);
match old_consensus_head_connections.as_ref() {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
Some(old_head_block) => {
// TODO: do this log item better
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_head_block.number().cmp(&old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?;
debug!(
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
}
Some(old_consensus_connections) => {
let old_head_block = &old_consensus_connections.head_block;
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
// TODO: do this log item better
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
match consensus_head_block.number().cmp(old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
debug!(
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
@ -520,82 +492,73 @@ impl Web3Rpcs {
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.context(
"save_block sending consensus_head_block as heaviest chain",
)?;
.context("save consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?;
.context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
}
}
}
}
} else {
// TODO: do this log item better
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
if num_active_rpcs >= self.min_head_rpcs {
// no consensus!!!
error!(
"non {}/{} {}{}/{}/{} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
rpc,
rpc_head_str,
);
} else {
// no consensus, but we do not have enough rpcs connected yet to panic
debug!(
"non {}/{} {}{}/{}/{} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.context("save_block sending consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
}
}
}

@ -16,10 +16,11 @@ use tokio::time::Instant;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
// TODO: tier should be an option, or we should have consensus be stored as an Option<ConsensusWeb3Rpcs>
pub(super) tier: u64,
pub(super) head_block: Option<Web3ProxyBlock>,
pub(super) head_block: Web3ProxyBlock,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) rpcs: Vec<Arc<Web3Rpc>>,
@ -69,11 +70,23 @@ impl Web3Rpcs {
}
pub fn synced(&self) -> bool {
!self.watch_consensus_rpcs_sender.borrow().rpcs.is_empty()
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
!consensus.rpcs.is_empty()
} else {
false
}
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_rpcs_sender.borrow().rpcs.len()
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
consensus.rpcs.len()
} else {
0
}
}
}
@ -100,6 +113,10 @@ impl ConnectionsGroup {
self.rpc_name_to_block.len()
}
pub fn is_empty(&self) -> bool {
self.rpc_name_to_block.is_empty()
}
fn remove(&mut self, rpc_name: &str) -> Option<Web3ProxyBlock> {
if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) {
match self.highest_block.as_mut() {
@ -255,14 +272,14 @@ impl ConnectionsGroup {
// not enough rpcs on this block. check the parent block
match web3_rpcs
.block(authorization, &maybe_head_block.parent_hash(), None)
.block(authorization, maybe_head_block.parent_hash(), None)
.await
{
Ok(parent_block) => {
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
// );
maybe_head_block = parent_block.into();
maybe_head_block = parent_block;
continue;
}
Err(err) => {
@ -325,7 +342,7 @@ impl ConnectionsGroup {
Ok(ConsensusWeb3Rpcs {
tier: *tier,
head_block: Some(maybe_head_block),
head_block: maybe_head_block,
rpcs,
backups_voted: backup_rpcs_voted,
backups_needed: primary_rpcs_voted.is_none(),
@ -371,6 +388,10 @@ impl ConsensusFinder {
self.tiers.len()
}
pub fn is_empty(&self) -> bool {
self.tiers.is_empty()
}
/// get the ConnectionsGroup that contains all rpcs
/// panics if there are no tiers
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
@ -457,26 +478,18 @@ impl ConsensusFinder {
}
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
if prev_block.hash() == rpc_head_block.hash() {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
}
// false if this block was already sent by this rpc. return early
// true if new block for this rpc
prev_block.hash() != rpc_head_block.hash()
} else {
// first block for this rpc
true
}
}
None => {
if self.remove(&rpc).is_none() {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
// false if this rpc was already removed
// true if rpc head changed from being synced to not
self.remove(&rpc).is_some()
}
};

@ -50,7 +50,7 @@ pub struct Web3Rpcs {
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
/// Geth's subscriptions have the same potential for skipping blocks.
pub(super) watch_consensus_rpcs_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
pub(super) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) pending_transaction_cache:
@ -125,7 +125,7 @@ impl Web3Rpcs {
// trace!("http interval ready");
if let Err(_) = sender.send(()) {
if sender.send(()).is_err() {
// errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet
trace!("no http receivers");
};
@ -181,7 +181,7 @@ impl Web3Rpcs {
max_block_lag,
});
let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
let authorization = Arc::new(Authorization::internal(db_conn)?);
let handle = {
let connections = connections.clone();
@ -478,12 +478,14 @@ impl Web3Rpcs {
let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option<U64>), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
let (head_block_num, head_block_age) =
if let Some(head_block) = synced_connections.head_block.as_ref() {
(head_block.number(), head_block.age())
} else {
return Ok(OpenRequestResult::NotReady);
};
if synced_connections.is_none() {
return Ok(OpenRequestResult::NotReady);
}
let synced_connections =
synced_connections.expect("synced_connections can't be None here");
let head_block_num = synced_connections.head_block.number();
let head_block_age = synced_connections.head_block.age();
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
@ -530,28 +532,20 @@ impl Web3Rpcs {
.values()
.filter(|x| {
// TODO: move a bunch of this onto a rpc.is_synced function
#[allow(clippy::if_same_then_else)]
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.and_then(|max_block_needed| {
Some(!x.has_block_data(max_block_needed))
})
.map(|max_block_needed| !x.has_block_data(max_block_needed))
.unwrap_or(false)
{
// server does not have the max block
false
} else if min_block_needed
.and_then(|min_block_needed| {
Some(!x.has_block_data(min_block_needed))
})
.unwrap_or(false)
{
// server does not have the min block
false
} else {
// server has the block we need!
true
!min_block_needed
.map(|min_block_needed| !x.has_block_data(min_block_needed))
.unwrap_or(false)
}
})
.cloned()
@ -599,9 +593,7 @@ impl Web3Rpcs {
}
trace!("not skipped!");
m.entry(key.clone())
.or_insert_with(Vec::new)
.push(x.clone());
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
@ -703,44 +695,9 @@ impl Web3Rpcs {
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
always_include_backups: bool,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
if !always_include_backups {
if let Ok(without_backups) = self
._all_connections(
false,
authorization,
min_block_needed,
max_block_needed,
max_count,
)
.await
{
return Ok(without_backups);
}
}
self._all_connections(
true,
authorization,
min_block_needed,
max_block_needed,
max_count,
)
.await
}
async fn _all_connections(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
// TODO: with capacity?
let mut selected_rpcs = vec![];
let mut max_count = if let Some(max_count) = max_count {
max_count
@ -748,63 +705,83 @@ impl Web3Rpcs {
self.by_name.read().len()
};
trace!("max_count: {}", max_count);
let mut selected_rpcs = Vec::with_capacity(max_count);
let mut tried = HashSet::new();
let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().rpcs.clone();
let mut synced_rpcs = {
let synced_rpcs = self.watch_consensus_rpcs_sender.borrow();
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
synced_rpcs.rpcs.clone()
} else {
vec![]
}
};
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(rpc_sync_status_sort_key);
synced_rpcs.sort_by_cached_key(rpc_sync_status_sort_key);
trace!("synced_rpcs: {:#?}", synced_rpcs);
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_conns isn't enough
let mut all_conns: Vec<_> = self.by_name.read().values().cloned().collect();
all_conns.sort_by_cached_key(rpc_sync_status_sort_key);
// TODO: only do this sorting if the synced_rpcs isn't enough
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
all_rpcs.sort_by_cached_key(rpc_sync_status_sort_key);
for connection in itertools::chain(synced_conns, all_conns) {
trace!("all_rpcs: {:#?}", all_rpcs);
for rpc in itertools::chain(synced_rpcs, all_rpcs) {
if max_count == 0 {
break;
}
if tried.contains(&connection.name) {
if tried.contains(&rpc.name) {
continue;
}
tried.insert(connection.name.clone());
trace!("trying {}", rpc);
if !allow_backups && connection.backup {
tried.insert(rpc.name.clone());
if !allow_backups && rpc.backup {
trace!("{} is a backup. skipping", rpc);
continue;
}
if let Some(block_needed) = min_block_needed {
if !connection.has_block_data(block_needed) {
if !rpc.has_block_data(block_needed) {
trace!("{} is missing min_block_needed. skipping", rpc);
continue;
}
}
if let Some(block_needed) = max_block_needed {
if !connection.has_block_data(block_needed) {
if !rpc.has_block_data(block_needed) {
trace!("{} is missing max_block_needed. skipping", rpc);
continue;
}
}
// check rate limits and increment our connection counter
match connection.try_request_handle(authorization, None).await {
match rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
trace!("{} is rate limited. skipping", rpc);
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::Handle(handle)) => {
trace!("{} is available", rpc);
max_count -= 1;
selected_rpcs.push(handle)
}
Ok(OpenRequestResult::NotReady) => {
warn!("no request handle for {}", connection)
warn!("no request handle for {}", rpc)
}
Err(err) => {
warn!(
"error getting request handle for {}. err={:?}",
connection, err
)
warn!("error getting request handle for {}. err={:?}", rpc, err)
}
}
}
@ -1015,6 +992,7 @@ impl Web3Rpcs {
if num_skipped == 0 {
error!("No servers synced ({} known). None skipped", num_conns);
// TODO: what error code?
Ok(JsonRpcForwardedResponse::from_str(
"No servers synced",
Some(-32000),
@ -1038,6 +1016,7 @@ impl Web3Rpcs {
}
/// be sure there is a timeout on this or it might loop forever
#[allow(clippy::too_many_arguments)]
pub async fn try_send_all_synced_connections(
&self,
authorization: &Arc<Authorization>,
@ -1180,9 +1159,14 @@ impl Serialize for Web3Rpcs {
}
{
let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone();
// TODO: rename synced_connections to consensus_connections?
state.serialize_field("synced_connections", &consensus_connections)?;
let consensus_rpcs = self.watch_consensus_rpcs_sender.borrow();
// TODO: rename synced_connections to consensus_rpcs
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
state.serialize_field("synced_connections", consensus_rpcs)?;
} else {
state.serialize_field("synced_connections", &None::<()>)?;
}
}
self.blocks_by_hash.sync();
@ -1369,11 +1353,11 @@ mod tests {
..Default::default()
};
assert!(head_rpc.has_block_data(&lagged_block.number()));
assert!(head_rpc.has_block_data(&head_block.number()));
assert!(head_rpc.has_block_data(lagged_block.number()));
assert!(head_rpc.has_block_data(head_block.number()));
assert!(lagged_rpc.has_block_data(&lagged_block.number()));
assert!(!lagged_rpc.has_block_data(&head_block.number()));
assert!(lagged_rpc.has_block_data(lagged_block.number()));
assert!(!lagged_rpc.has_block_data(head_block.number()));
let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc);
@ -1383,10 +1367,12 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (block_sender, _) = flume::unbounded();
let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
let (watch_consensus_head_sender, _) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) =
watch::channel(Default::default());
let (watch_consensus_head_sender, _watch_consensus_head_receiver) =
watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
@ -1570,8 +1556,8 @@ mod tests {
..Default::default()
};
assert!(pruned_rpc.has_block_data(&head_block.number()));
assert!(archive_rpc.has_block_data(&head_block.number()));
assert!(pruned_rpc.has_block_data(head_block.number()));
assert!(archive_rpc.has_block_data(head_block.number()));
assert!(!pruned_rpc.has_block_data(&1.into()));
assert!(archive_rpc.has_block_data(&1.into()));
@ -1640,7 +1626,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let best_available_server = rpcs
.best_available_rpc(&authorization, None, &[], Some(&head_block.number()), None)
.best_available_rpc(&authorization, None, &[], Some(head_block.number()), None)
.await;
debug!("best_available_server: {:#?}", best_available_server);

@ -558,10 +558,8 @@ impl Web3Rpc {
drop(unlocked_provider);
info!("successfully connected to {}", self);
} else {
if self.provider.read().await.is_none() {
return Err(anyhow!("failed waiting for client"));
}
} else if self.provider.read().await.is_none() {
return Err(anyhow!("failed waiting for client"));
};
Ok(())
@ -604,7 +602,7 @@ impl Web3Rpc {
{
let mut head_block = self.head_block.write();
let _ = head_block.insert(new_head_block.clone().into());
let _ = head_block.insert(new_head_block.clone());
}
if self.block_data_limit() == U64::zero() {
@ -712,7 +710,7 @@ impl Web3Rpc {
let head_block = conn.head_block.read().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| {
let block = x.block.clone();
let block = x.block;
let block_number = block.number?;
let txid = block.transactions.last().cloned()?;
@ -1146,7 +1144,7 @@ impl Web3Rpc {
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
let hard_limit_ready = hard_limit_until.borrow().clone();
let hard_limit_ready = *hard_limit_until.borrow();
let now = Instant::now();
@ -1178,7 +1176,7 @@ impl Web3Rpc {
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
hard_limit_until.send_replace(retry_at.clone());
hard_limit_until.send_replace(retry_at);
}
return Ok(OpenRequestResult::RetryAt(retry_at));
@ -1355,7 +1353,7 @@ mod tests {
assert!(x.has_block_data(&0.into()));
assert!(x.has_block_data(&1.into()));
assert!(x.has_block_data(&head_block.number()));
assert!(x.has_block_data(head_block.number()));
assert!(!x.has_block_data(&(head_block.number() + 1)));
assert!(!x.has_block_data(&(head_block.number() + 1000)));
}
@ -1394,7 +1392,7 @@ mod tests {
assert!(!x.has_block_data(&1.into()));
assert!(!x.has_block_data(&(head_block.number() - block_data_limit - 1)));
assert!(x.has_block_data(&(head_block.number() - block_data_limit)));
assert!(x.has_block_data(&head_block.number()));
assert!(x.has_block_data(head_block.number()));
assert!(!x.has_block_data(&(head_block.number() + 1)));
assert!(!x.has_block_data(&(head_block.number() + 1000)));
}