use watch instead of arcswap

This commit is contained in:
Bryan Stitt 2023-01-22 22:02:08 -08:00
parent ec5c28b64b
commit 86e3f2991f
7 changed files with 140 additions and 74 deletions

7
Cargo.lock generated

@ -105,12 +105,6 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "argh"
version = "0.1.10"
@ -5574,7 +5568,6 @@ name = "web3_proxy"
version = "0.13.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"axum",
"axum-client-ip",

@ -20,7 +20,6 @@ redis-rate-limiter = { path = "../redis-rate-limiter" }
thread-fast-rng = { path = "../thread-fast-rng" }
anyhow = { version = "1.0.68", features = ["backtrace"] }
arc-swap = "1.6.0"
argh = "0.1.10"
axum = { version = "0.6.3", features = ["headers", "ws"] }
axum-client-ip = "0.3.1"

@ -188,7 +188,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<ArcBlock>,
watch_consensus_head_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
@ -533,7 +533,8 @@ impl Web3ProxyApp {
};
// TODO: i don't like doing Block::default here! Change this to "None"?
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
let (watch_consensus_head_sender, watch_consensus_head_receiver) =
watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -570,7 +571,7 @@ impl Web3ProxyApp {
http_client.clone(),
vredis_pool.clone(),
block_map.clone(),
Some(head_block_sender),
Some(watch_consensus_head_sender),
top_config.app.min_sum_soft_limit,
top_config.app.min_synced_rpcs,
Some(pending_tx_sender.clone()),
@ -598,6 +599,8 @@ impl Web3ProxyApp {
vredis_pool.clone(),
block_map,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
None,
0,
0,
@ -706,7 +709,7 @@ impl Web3ProxyApp {
balanced_rpcs,
private_rpcs,
response_cache,
head_block_receiver,
watch_consensus_head_receiver,
pending_tx_sender,
pending_transactions,
frontend_ip_rate_limiter,
@ -730,7 +733,7 @@ impl Web3ProxyApp {
}
pub fn head_block_receiver(&self) -> watch::Receiver<ArcBlock> {
self.head_block_receiver.clone()
self.watch_consensus_head_receiver.clone()
}
pub async fn prometheus_metrics(&self) -> String {
@ -1362,10 +1365,10 @@ impl Web3ProxyApp {
method => {
// emit stats
// TODO: if no servers synced, wait for them to be synced?
let head_block = self
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
let head_block_num = self
.balanced_rpcs
.head_block()
.head_block_num()
.context("no servers synced")?;
// we do this check before checking caches because it might modify the request params
@ -1375,7 +1378,7 @@ impl Web3ProxyApp {
authorization,
method,
request.params.as_mut(),
head_block.number(),
head_block_num,
&self.balanced_rpcs,
)
.await?

@ -50,7 +50,7 @@ impl Web3ProxyApp {
match request_json.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let authorization = authorization.clone();
let head_block_receiver = self.head_block_receiver.clone();
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let stat_sender = self.stat_sender.clone();
trace!("newHeads subscription {:?}", subscription_id);

@ -165,9 +165,15 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: don't hard code allowed lag
// TODO: think more about this wait_for_sync
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, None)
.try_send_best_consensus_head_connection(
authorization,
request,
None,
None,
true,
)
.await?;
let block = response.result.context("failed fetching block")?;
@ -199,6 +205,7 @@ impl Web3Connections {
}
/// Get the heaviest chain's block from cache or backend rpc
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
pub async fn cannonical_block(
&self,
authorization: &Arc<Authorization>,
@ -208,23 +215,33 @@ impl Web3Connections {
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
let mut consensus_head_receiver = self
.watch_consensus_head_receiver
.as_ref()
.context("need new head subscriptions to fetch cannonical_block")?
.clone();
// be sure the requested block num exists
let head_block_num = self.head_block_num().context("no servers in sync")?;
let mut head_block_num = consensus_head_receiver.borrow_and_update().number;
loop {
if let Some(head_block_num) = head_block_num {
if num <= &head_block_num {
break;
}
}
consensus_head_receiver.changed().await?;
head_block_num = consensus_head_receiver.borrow_and_update().number;
}
let head_block_num =
head_block_num.expect("we should only get here if we have a head block");
// TODO: geth does 64, erigon does 90k. sometimes we run a mix
let archive_needed = num < &(head_block_num - U64::from(64));
if num > &head_block_num {
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
// TODO: instead of error, maybe just sleep and try again?
// TODO: this should be a 401, not a 500
return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested",
head_block_num,
num
));
}
// try to get the hash from our cache
// deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) {
@ -243,7 +260,7 @@ impl Web3Connections {
// TODO: if error, retry?
// TODO: request_metadata or authorization?
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num))
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), true)
.await?;
let raw_block = response.result.context("no block result")?;
@ -320,6 +337,8 @@ impl Web3Connections {
.best_consensus_connections(authorization, self)
.await;
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
let includes_backups = new_synced_connections.includes_backups;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
@ -327,14 +346,14 @@ impl Web3Connections {
let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len();
let total_rpcs = self.conns.len();
let old_synced_connections = self
.synced_connections
.swap(Arc::new(new_synced_connections));
let old_consensus_head_connections = self
.watch_consensus_connections_sender
.send_replace(Arc::new(new_synced_connections));
let includes_backups_str = if includes_backups { "B " } else { "" };
if let Some(consensus_saved_block) = consensus_head_block {
match &old_synced_connections.head_block {
match &old_consensus_head_connections.head_block {
None => {
debug!(
"first {}{}/{}/{}/{} block={}, rpc={}",
@ -843,7 +862,13 @@ impl ConsensusFinder {
Some(x) => x.number.expect("blocks here should always have a number"),
};
let min_block_num = highest_block_num.saturating_sub(U64::from(5));
// TODO: also needs to be not less than our current head
let mut min_block_num = highest_block_num.saturating_sub(U64::from(5));
// we also want to be sure we don't ever go backwards!
if let Some(current_consensus_head_num) = web3_connections.head_block_num() {
min_block_num = min_block_num.max(current_consensus_head_num);
}
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_head_for_main = self

@ -11,7 +11,6 @@ use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, H256, U64};
@ -38,9 +37,12 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
pub(crate) conns: HashMap<String, Arc<Web3Connection>>,
/// any requests will be forwarded to one (or more) of these connections
pub(super) synced_connections: ArcSwap<ConsensusConnections>,
pub(crate) conns: HashMap<String, Arc<Web3Connection>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
pub(super) watch_consensus_connections_sender: watch::Sender<Arc<ConsensusConnections>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<ArcBlock>>,
pub(super) pending_transactions:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
@ -62,7 +64,7 @@ impl Web3Connections {
http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
block_map: BlockHashesCache,
head_block_sender: Option<watch::Sender<ArcBlock>>,
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32,
min_head_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
@ -138,7 +140,7 @@ impl Web3Connections {
let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = if head_block_sender.is_some() {
let block_sender = if watch_consensus_head_sender.is_some() {
Some(block_sender.clone())
} else {
None
@ -192,8 +194,6 @@ impl Web3Connections {
}
}
let synced_connections = ConsensusConnections::default();
// TODO: max_capacity and time_to_idle from config
// all block hashes are the same size, so no need for weigher
let block_hashes = Cache::builder()
@ -206,9 +206,15 @@ impl Web3Connections {
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
let watch_consensus_head_receiver =
watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
let connections = Arc::new(Self {
conns: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
watch_consensus_connections_sender,
watch_consensus_head_receiver,
pending_transactions,
block_hashes,
block_numbers,
@ -228,7 +234,7 @@ impl Web3Connections {
authorization,
pending_tx_id_receiver,
block_receiver,
head_block_sender,
watch_consensus_head_sender,
pending_tx_sender,
)
.await
@ -447,11 +453,12 @@ impl Web3Connections {
(Option<U64>, u64),
Vec<Arc<Web3Connection>>,
> = {
let synced_connections = self.synced_connections.load();
let synced_connections = self.watch_consensus_connections_sender.borrow().clone();
let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() {
head_block.number()
} else {
// TODO: optionally wait for a head block >= min_block_needed
return Ok(OpenRequestResult::NotReady);
};
@ -495,6 +502,7 @@ impl Web3Connections {
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady);
}
}
@ -712,18 +720,27 @@ impl Web3Connections {
}
/// be sure there is a timeout on this or it might loop forever
/// TODO: think more about wait_for_sync
pub async fn try_send_best_consensus_head_connection(
&self,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
wait_for_sync: bool,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
let mut watch_consensus_connections = if wait_for_sync {
Some(self.watch_consensus_connections_sender.subscribe())
} else {
None
};
// TODO: maximum retries? right now its the total number of servers
loop {
// TODO: is self.conns still right now that we split main and backup servers?
// TODO: if a new block arrives, we probably want to reset the skip list
if skip_rpcs.len() == self.conns.len() {
// no servers to try
break;
@ -833,9 +850,6 @@ impl Web3Connections {
rpc, err
);
// TODO: sleep how long? until synced_connections changes or rate limits are available
// sleep(Duration::from_millis(100)).await;
continue;
}
}
@ -851,16 +865,38 @@ impl Web3Connections {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
sleep_until(retry_at).await;
continue;
if let Some(watch_consensus_connections) = watch_consensus_connections.as_mut()
{
// TODO: if there are other servers in synced_connections, we should continue now
// wait until retry_at OR synced_connections changes
tokio::select! {
_ = sleep_until(retry_at) => {
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
// TODO: would be nice to save this retry_at so we don't keep hitting limits
let _ = watch_consensus_connections.borrow_and_update();
}
}
continue;
} else {
break;
}
}
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
break;
if wait_for_sync {
// TODO: race here. there might have been a change while we were waiting on the previous server
self.watch_consensus_connections_sender
.subscribe()
.changed()
.await?;
} else {
break;
}
}
}
}
@ -979,6 +1015,7 @@ impl Web3Connections {
request,
request_metadata,
min_block_needed,
true,
)
.await
}
@ -1007,8 +1044,11 @@ impl Serialize for Web3Connections {
let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect();
state.serialize_field("conns", &conns)?;
let synced_connections = &**self.synced_connections.load();
state.serialize_field("synced_connections", synced_connections)?;
{
let consensus_connections = self.watch_consensus_connections_sender.borrow().clone();
// TODO: rename synced_connections to consensus_connections?
state.serialize_field("synced_connections", &consensus_connections)?;
}
self.block_hashes.sync();
self.block_numbers.sync();
@ -1128,9 +1168,13 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Connections::new
let conns = Web3Connections {
conns,
synced_connections: Default::default(),
watch_consensus_head_receiver: None,
watch_consensus_connections_sender,
pending_transactions: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
@ -1350,9 +1394,13 @@ mod tests {
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Connections::new
let conns = Web3Connections {
conns,
synced_connections: Default::default(),
watch_consensus_head_receiver: None,
watch_consensus_connections_sender,
pending_transactions: Cache::builder()
.max_capacity(10)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),

@ -1,4 +1,4 @@
use super::blockchain::SavedBlock;
use super::blockchain::{ArcBlock, SavedBlock};
use super::connection::Web3Connection;
use super::connections::Web3Connections;
use ethers::prelude::{H256, U64};
@ -43,31 +43,29 @@ impl fmt::Debug for ConsensusConnections {
}
impl Web3Connections {
pub fn head_block(&self) -> Option<SavedBlock> {
self.synced_connections.load().head_block.clone()
pub fn head_block(&self) -> Option<ArcBlock> {
self.watch_consensus_head_receiver
.as_ref()
.map(|x| x.borrow().clone())
}
pub fn head_block_hash(&self) -> Option<H256> {
self.synced_connections
.load()
.head_block
.as_ref()
.map(|head_block| head_block.hash())
self.head_block().and_then(|x| x.hash)
}
pub fn head_block_num(&self) -> Option<U64> {
self.synced_connections
.load()
.head_block
.as_ref()
.map(|head_block| head_block.number())
self.head_block().and_then(|x| x.number)
}
pub fn synced(&self) -> bool {
!self.synced_connections.load().conns.is_empty()
!self
.watch_consensus_connections_sender
.borrow()
.conns
.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.synced_connections.load().conns.len()
self.watch_consensus_connections_sender.borrow().conns.len()
}
}