we need our own Reconnect (#124)

* first pass at handling different return data limits

* put ws_provider in an arcswap

* add min max_latency

* add min max_latency

* subscribe with reconnect

* better logging around reconnect

* select on both watches

* subscribe to the correct watch
This commit is contained in:
Bryan Stitt 2023-06-12 18:58:51 -07:00 committed by GitHub
parent e26aef4313
commit 1a8f799969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 58 deletions

@ -465,7 +465,7 @@ impl ConsensusFinder {
// // histogram requires high to be at least 2 x low
// // using min_latency for low does not work how we want it though
// max_latency = max_latency.max(2 * min_latency);
max_latency = max_latency.max(1000);
// create the histogram
let mut hist = Histogram::<u32>::new_with_bounds(1, max_latency, 3).unwrap();

@ -923,12 +923,30 @@ impl Web3Rpcs {
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
warn!(
"rate limited ({}) by {}",
error_msg,
skip_rpcs.last().unwrap()
);
continue;
if rate_limit_substr.contains("result on length") {
// this error contains "limit" but is not a rate limit error
// TODO: make the expected limit configurable
// TODO: parse the rate_limit_substr and only continue if it is < expected limit
if rate_limit_substr.contains("exceeding limit 2000000") {
// they hit our expected limit. return the error now
return Err(error.into());
} else {
// they hit a limit lower than what we expect
warn!(
"unexpected result limit ({}) by {}",
error_msg,
skip_rpcs.last().unwrap()
);
continue;
}
} else {
warn!(
"rate limited ({}) by {}",
error_msg,
skip_rpcs.last().unwrap()
);
continue;
}
}
}

@ -9,6 +9,7 @@ use crate::frontend::authorization::Authorization;
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
use ethers::prelude::{Bytes, Middleware, TxHash, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
@ -28,6 +29,7 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use tokio::select;
use tokio::sync::watch;
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
use url::Url;
@ -40,10 +42,12 @@ pub struct Web3Rpc {
pub db_conn: Option<DatabaseConnection>,
/// most all requests prefer use the http_provider
pub(super) http_provider: Option<EthersHttpProvider>,
/// the websocket url is only used for subscriptions
pub(super) ws_url: Option<Url>,
/// the websocket provider is only used for subscriptions
pub(super) ws_provider: Option<EthersWsProvider>,
pub(super) ws_provider: ArcSwapOption<EthersWsProvider>,
/// keep track of hard limits
/// this is only inside an Option so that the "Default" derive works. it will always be set.
/// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
/// We do not use the deferred rate limiter because going over limits would cause errors
@ -56,13 +60,12 @@ pub struct Web3Rpc {
pub backup: bool,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
/// this is only inside an Option so that the "Default" derive works. it will always be set.
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency
pub(super) head_latency: RwLock<EwmaLatency>,
/// Track peak request latency
/// This is only inside an Option so that the "Default" derive works. it will always be set.
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Automatically set priority
pub(super) tier: AtomicU8,
@ -70,8 +73,9 @@ pub struct Web3Rpc {
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
pub(super) active_requests: AtomicUsize,
/// this is only inside an Option so that the "Default" derive works. it will always be set.
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) created_at: Option<Instant>,
}
@ -172,12 +176,10 @@ impl Web3Rpc {
None
};
let ws_provider = if let Some(ws_url) = config.ws_url {
let ws_url = if let Some(ws_url) = config.ws_url {
let ws_url = ws_url.parse::<Url>()?;
Some(connect_ws(ws_url, usize::MAX).await?)
// TODO: check the provider is on the right chain
Some(ws_url)
} else {
None
};
@ -189,7 +191,7 @@ impl Web3Rpc {
backup,
block_data_limit,
created_at: Some(created_at),
db_conn: db_conn.clone(),
db_conn,
display_name: config.display_name,
hard_limit,
hard_limit_until: Some(hard_limit_until),
@ -198,7 +200,7 @@ impl Web3Rpc {
name,
peak_latency: Some(peak_latency),
soft_limit: config.soft_limit,
ws_provider,
ws_url,
disconnect_watch: Some(disconnect_watch),
..Default::default()
};
@ -211,8 +213,9 @@ impl Web3Rpc {
let handle = {
let new_connection = new_connection.clone();
tokio::spawn(async move {
// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
new_connection
.subscribe(block_map, block_sender, chain_id, tx_id_sender)
.subscribe_with_reconnect(block_map, block_sender, chain_id, tx_id_sender)
.await
})
};
@ -410,6 +413,7 @@ impl Web3Rpc {
}
/// query the web3 provider to confirm it is on the expected chain with the expected data available
/// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match
async fn check_provider(self: &Arc<Self>, chain_id: u64) -> Web3ProxyResult<()> {
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
@ -563,10 +567,50 @@ impl Web3Rpc {
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn subscribe_with_reconnect(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
loop {
if let Err(err) = self
.clone()
.subscribe(
block_map.clone(),
block_sender.clone(),
chain_id,
tx_id_sender.clone(),
)
.await
{
if self.should_disconnect() {
break;
}
warn!("{} subscribe err: {:#?}", self, err)
} else if self.should_disconnect() {
break;
}
if self.backup {
debug!("reconnecting to {} in 30 seconds", self);
} else {
info!("reconnecting to {} in 30 seconds", self);
}
// TODO: exponential backoff with jitter
sleep(Duration::from_secs(30)).await;
}
Ok(())
}
/// subscribe to blocks and transactions
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self? chain_id for sure
#[allow(clippy::too_many_arguments)]
async fn subscribe(
self: Arc<Self>,
block_map: BlocksByHashCache,
@ -580,12 +624,45 @@ impl Web3Rpc {
Some(RequestErrorHandler::ErrorLevel)
};
if let Some(url) = self.ws_url.clone() {
debug!("starting websocket provider on {}", self);
let x = connect_ws(url, usize::MAX).await?;
let x = Arc::new(x);
self.ws_provider.store(Some(x));
}
debug!("starting subscriptions on {}", self);
self.check_provider(chain_id).await?;
let mut futures = vec![];
// TODO: use this channel instead of self.disconnect_watch
let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false);
// subscribe to the disconnect watch. the app uses this when shutting down
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
let mut disconnect_watch_rx = disconnect_watch_tx.subscribe();
let f = async move {
// TODO: make sure it changed to "true"
select! {
x = disconnect_watch_rx.changed() => {
x?;
},
x = subscribe_stop_rx.changed() => {
x?;
},
}
Ok(())
};
futures.push(flatten_handle(tokio::spawn(f)));
}
// health check that runs if there haven't been any recent requests
{
// TODO: move this into a proper function
@ -595,14 +672,16 @@ impl Web3Rpc {
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
let health_sleep_seconds = 5;
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
// health check loop
let f = async move {
// TODO: benchmark this and lock contention
let mut old_total_requests = 0;
let mut new_total_requests;
// TODO: errors here should not cause the loop to exit!
while !rpc.should_disconnect() {
// errors here should not cause the loop to exit!
while !(*subscribe_stop_rx.borrow()) {
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
if new_total_requests - old_total_requests < 5 {
@ -629,11 +708,24 @@ impl Web3Rpc {
}
// subscribe to new heads
if let Some(block_sender) = &block_sender {
// TODO: do we need this to be abortable?
let f = self
.clone()
.subscribe_new_heads(block_sender.clone(), block_map.clone());
if let Some(block_sender) = block_sender.clone() {
let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let f = async move {
let x = clone
.subscribe_new_heads(block_sender.clone(), block_map.clone(), subscribe_stop_rx)
.await;
// error or success, we clear the block when subscribe_new_heads exits
clone
.send_head_block_result(Ok(None), &block_sender, &block_map)
.await?;
x
};
// TODO: if
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -641,8 +733,11 @@ impl Web3Rpc {
// subscribe pending transactions
// TODO: make this opt-in. its a lot of bandwidth
if let Some(tx_id_sender) = tx_id_sender {
// TODO: do we need this to be abortable?
let f = self.clone().subscribe_pending_transactions(tx_id_sender);
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let f = self
.clone()
.subscribe_pending_transactions(tx_id_sender, subscribe_stop_rx);
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -654,19 +749,19 @@ impl Web3Rpc {
debug!("subscriptions on {} exited", self);
self.disconnect_watch
.as_ref()
.expect("disconnect_watch should always be set")
.send_replace(true);
subscribe_stop_tx.send_replace(true);
// TODO: wait for all of the futures to exit?
Ok(())
}
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: Arc<Self>,
self: &Arc<Self>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
debug!("subscribing to new heads on {}", self);
@ -674,7 +769,7 @@ impl Web3Rpc {
let error_handler = None;
let authorization = Default::default();
if let Some(ws_provider) = self.ws_provider.as_ref() {
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle
let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler)
@ -686,7 +781,7 @@ impl Web3Rpc {
// there is a very small race condition here where the stream could send us a new block right now
// but all seeing the same block twice won't break anything
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: can we force this to use the websocket?
// TODO: send this request to the ws_provider instead of the http_provider
let latest_block: Result<Option<ArcBlock>, _> = self
.authorized_request(
"eth_getBlockByNumber",
@ -700,7 +795,7 @@ impl Web3Rpc {
.await?;
while let Some(block) = blocks.next().await {
if self.should_disconnect() {
if *subscribe_stop_rx.borrow() {
break;
}
@ -714,7 +809,7 @@ impl Web3Rpc {
let mut blocks = http_provider.watch_blocks().await?;
while let Some(block_hash) = blocks.next().await {
if self.should_disconnect() {
if *subscribe_stop_rx.borrow() {
break;
}
@ -737,7 +832,7 @@ impl Web3Rpc {
self.send_head_block_result(Ok(None), &block_sender, &block_map)
.await?;
if self.should_disconnect() {
if *subscribe_stop_rx.borrow() {
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed").into())
@ -748,9 +843,9 @@ impl Web3Rpc {
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
// TODO: make this subscription optional
self.wait_for_disconnect().await?;
subscribe_stop_rx.changed().await?;
/*
trace!("watching pending transactions on {}", self);
@ -795,7 +890,7 @@ impl Web3Rpc {
}
*/
if self.should_disconnect() {
if *subscribe_stop_rx.borrow() {
Ok(())
} else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed").into())
@ -909,20 +1004,6 @@ impl Web3Rpc {
Ok(handle.into())
}
async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> {
let mut disconnect_subscription = self.disconnect_watch.as_ref().unwrap().subscribe();
loop {
if *disconnect_subscription.borrow_and_update() {
// disconnect watch is set to "true"
return Ok(());
}
// wait for disconnect_subscription to change
disconnect_subscription.changed().await?;
}
}
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,

@ -193,7 +193,7 @@ impl OpenRequestHandle {
// TODO: replace ethers-rs providers with our own that handles "id" being null
let response: Result<R, _> = if let Some(ref p) = self.rpc.http_provider {
p.request(method, params).await
} else if let Some(ref p) = self.rpc.ws_provider {
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
p.request(method, params).await
} else {
return Err(ProviderError::CustomError(