much smarter connection logic

This commit is contained in:
Bryan Stitt 2022-12-05 13:13:36 -08:00
parent f736aeb027
commit 5bec8bb5b9
9 changed files with 429 additions and 266 deletions

@ -70,12 +70,12 @@ impl CostCalculatorCommand {
#[derive(Debug, FromQueryResult)]
struct SelectResult {
pub total_frontend_requests: Decimal,
pub total_backend_retries: Decimal,
pub total_cache_misses: Decimal,
// pub total_backend_retries: Decimal,
// pub total_cache_misses: Decimal,
pub total_cache_hits: Decimal,
pub total_response_bytes: Decimal,
pub total_error_responses: Decimal,
pub total_response_millis: Decimal,
// pub total_response_millis: Decimal,
pub first_period_datetime: DateTimeUtc,
pub last_period_datetime: DateTimeUtc,
}
@ -86,14 +86,14 @@ impl CostCalculatorCommand {
rpc_accounting::Column::FrontendRequests.sum(),
"total_frontend_requests",
)
.column_as(
rpc_accounting::Column::BackendRequests.sum(),
"total_backend_retries",
)
.column_as(
rpc_accounting::Column::CacheMisses.sum(),
"total_cache_misses",
)
// .column_as(
// rpc_accounting::Column::BackendRequests.sum(),
// "total_backend_retries",
// )
// .column_as(
// rpc_accounting::Column::CacheMisses.sum(),
// "total_cache_misses",
// )
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
.column_as(
rpc_accounting::Column::SumResponseBytes.sum(),
@ -104,10 +104,10 @@ impl CostCalculatorCommand {
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
)
// .column_as(
// rpc_accounting::Column::SumResponseMillis.sum(),
// "total_response_millis",
// )
.column_as(
rpc_accounting::Column::PeriodDatetime.min(),
"first_period_datetime",

@ -24,6 +24,7 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
// TODO:
#[derive(Debug, From)]
pub enum FrontendErrorResponse {
AccessDenied,
Anyhow(anyhow::Error),
Box(Box<dyn Error>),
Database(DbErr),
@ -45,7 +46,21 @@ pub enum FrontendErrorResponse {
impl IntoResponse for FrontendErrorResponse {
fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs
// TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY
let (status_code, response) = match self {
Self::AccessDenied => {
// TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident
trace!("access denied");
(
StatusCode::FORBIDDEN,
JsonRpcForwardedResponse::from_string(
// TODO: is it safe to expose all of our anyhow strings?
"FORBIDDEN".to_string(),
Some(StatusCode::FORBIDDEN.as_u16().into()),
None,
),
)
}
Self::Anyhow(err) => {
warn!("anyhow. err={:?}", err);
(

@ -29,11 +29,21 @@ pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHa
pub struct SavedBlock {
pub block: ArcBlock,
/// number of seconds this block was behind the current time when received
lag: u64,
pub lag: u64,
}
impl SavedBlock {
pub fn new(block: ArcBlock) -> Self {
let mut x = Self { block, lag: 0 };
// no need to recalulate lag every time
// if the head block gets too old, a health check restarts this connection
x.lag = x.lag();
x
}
pub fn lag(&self) -> u64 {
// TODO: read this from a global config. different chains should probably have different gaps.
let allowed_lag: u64 = 60;
@ -45,18 +55,15 @@ impl SavedBlock {
// TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks?
let oldest_allowed = now - Duration::from_secs(allowed_lag);
let block_timestamp = Duration::from_secs(block.timestamp.as_u64());
let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64());
// TODO: recalculate this every time?
let lag = if block_timestamp < oldest_allowed {
if block_timestamp < oldest_allowed {
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
(oldest_allowed - block_timestamp).as_secs() as u64
} else {
0
};
Self { block, lag }
}
}
pub fn hash(&self) -> H256 {
@ -143,7 +150,7 @@ impl Web3Connections {
// TODO: if error, retry?
let block: ArcBlock = match rpc {
Some(rpc) => {
rpc.wait_for_request_handle(authorization, Duration::from_secs(30))
rpc.wait_for_request_handle(authorization, Duration::from_secs(30), false)
.await?
.request(
"eth_getBlockByHash",
@ -301,9 +308,6 @@ impl Web3Connections {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let rpc_head_block = match rpc_head_block {
Some(rpc_head_block) => {
let rpc_head_num = rpc_head_block.number();
let rpc_head_hash = rpc_head_block.hash();
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block.block, false).await?;
@ -314,6 +318,8 @@ impl Web3Connections {
None
} else {
let rpc_head_hash = rpc_head_block.hash();
if let Some(prev_hash) =
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash)
{
@ -490,10 +496,11 @@ impl Web3Connections {
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect();
let consensus_head_hash = maybe_head_block
// TODO: DEBUG only check
let _ = maybe_head_block
.hash
.expect("head blocks always have hashes");
let consensus_head_num = maybe_head_block
let _ = maybe_head_block
.number
.expect("head blocks always have numbers");

@ -7,6 +7,7 @@ use crate::config::BlockAndRpc;
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Block, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use log::{debug, error, info, trace, warn, Level};
@ -23,9 +24,38 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::broadcast;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tokio::sync::{broadcast, oneshot, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
#[derive(Debug)]
pub enum ProviderState {
None,
NotReady(Arc<Web3Provider>),
Ready(Arc<Web3Provider>),
}
impl ProviderState {
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
match self {
ProviderState::None => None,
ProviderState::NotReady(x) => {
if allow_not_ready {
Some(x)
} else {
// TODO: do a ready check here?
None
}
}
ProviderState::Ready(x) => {
if x.ready() {
Some(x)
} else {
None
}
}
}
}
}
/// An active connection to a Web3 RPC server like geth or erigon.
pub struct Web3Connection {
@ -43,12 +73,14 @@ pub struct Web3Connection {
pub(super) internal_requests: AtomicU64,
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
pub(super) provider_state: AsyncRwLock<ProviderState>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
/// We do not use the deferred rate limiter because going over limits would cause errors
pub(super) hard_limit: Option<RedisRateLimiter>,
/// used for load balancing to the least loaded server
pub(super) soft_limit: u32,
/// use web3 queries to find the block data limit for archive/pruned nodes
pub(super) automatic_block_limit: bool,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// Lower weight are higher priority when sending requests. 0 to 99.
@ -97,7 +129,10 @@ impl Web3Connection {
// turn weight 0 into 100% and weight 100 into 0%
let weight = (100 - weight) as f64 / 100.0;
let block_data_limit = block_data_limit.unwrap_or_default().into();
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into();
let automatic_block_limit =
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
let new_connection = Self {
name,
@ -107,9 +142,10 @@ impl Web3Connection {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(None),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit,
soft_limit,
automatic_block_limit,
block_data_limit,
head_block: RwLock::new(Default::default()),
weight,
@ -118,89 +154,27 @@ impl Web3Connection {
let new_connection = Arc::new(new_connection);
// connect to the server (with retries)
// TODO: PROBLEM! THIS RETRIES FOREVER AND BLOCKS THE APP STARTING
new_connection
.retrying_reconnect(block_sender.as_ref(), false)
.await?;
let authorization = Arc::new(Authorization::internal(db_conn)?);
// check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be?
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await?
.request(
"eth_chainId",
&json!(Option::None::<()>),
Level::Error.into(),
)
.await;
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", new_connection.name)));
}
}
Err(e) => {
let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection.name));
return Err(e);
}
}
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let check_block_limit_needed = (new_connection
.block_data_limit
.load(atomic::Ordering::Acquire)
== 0)
&& block_sender.is_some();
// subscribe to new blocks and new transactions
// subscribing starts the connection (with retries)
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
let handle = {
let new_connection = new_connection.clone();
let authorization = authorization.clone();
let authorization = Arc::new(Authorization::internal(db_conn)?);
tokio::spawn(async move {
new_connection
.subscribe(
&authorization,
http_interval_sender,
block_map,
block_sender,
tx_id_sender,
chain_id,
http_interval_sender,
reconnect,
tx_id_sender,
)
.await
})
};
// we could take "archive" as a parameter, but we would want a safety check on it regardless
// check common archive thresholds
// TODO: would be great if rpcs exposed this
// TODO: move this to a helper function so we can recheck on errors or as the chain grows
// TODO: move this to a helper function that checks
if check_block_limit_needed {
// TODO: make sure the server isn't still syncing
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(250)).await;
new_connection
.check_block_data_limit(&authorization)
.await?;
}
Ok((new_connection, handle))
}
@ -208,32 +182,68 @@ impl Web3Connection {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<Option<u64>> {
if !self.automatic_block_limit {
// TODO: is this a good thing to return
return Ok(None);
}
let mut limit = None;
// check if we are synced
let head_block: ArcBlock = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
// error here are expected, so keep the level low
Level::Debug.into(),
)
.await?;
if SavedBlock::from(head_block).syncing() {
// if the node is syncing, we can't check its block data limit
// TODO: once a node stops syncing, how do we make sure this is run?
self.block_data_limit.store(0, atomic::Ordering::Release);
return Ok(Some(0));
}
// TODO: add SavedBlock to self? probably best not to. we might not get marked Ready
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let mut head_block_id = self.head_block.read().clone();
let handle = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.await?;
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
while head_block_id.is_none() {
warn!("no head block yet. retrying rpc {}", self);
let head_block_num_future = handle.request::<Option<()>, U256>(
"eth_blockNumber",
&None,
// error here are expected, so keep the level low
Level::Debug.into(),
);
// TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull
sleep(Duration::from_secs(13)).await;
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
.await
.context("timeout fetching eth_blockNumber")?
.context("provider error")?;
head_block_id = self.head_block.read().clone();
}
let head_block_num = head_block_id.expect("is_none was checked above").number();
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
trace!(
"checking maybe_archive_block on {}: {}",
self,
maybe_archive_block
);
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let archive_result: Result<Bytes, _> = self
.wait_for_request_handle(authorization, Duration::from_secs(30))
.await?
let handle = self
.wait_for_request_handle(authorization, Duration::from_secs(30), true)
.await?;
let archive_result: Result<Bytes, _> = handle
.request(
"eth_getCode",
&json!((
@ -246,9 +256,10 @@ impl Web3Connection {
.await;
trace!(
"archive_result on {} for {}: {:?}",
"archive_result on {} for {} ({}): {:?}",
self,
block_data_limit,
self.name,
maybe_archive_block,
archive_result
);
@ -304,10 +315,12 @@ impl Web3Connection {
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_reconnect(
pub async fn retrying_connect(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
initial_sleep: bool,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
delay_start: bool,
) -> anyhow::Result<()> {
// there are several crates that have retry helpers, but they all seem more complex than necessary
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
@ -317,7 +330,7 @@ impl Web3Connection {
// sleep once before the initial retry attempt
// TODO: now that we use this method for our initial connection, do we still want this sleep?
let mut sleep_ms = if initial_sleep {
let mut sleep_ms = if delay_start {
let first_sleep_ms = min(
cap_ms,
thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)),
@ -334,8 +347,8 @@ impl Web3Connection {
};
// retry until we succeed
while let Err(err) = self.reconnect(block_sender).await {
// thread_rng is crytographically secure. we don't need that, but we don't need this super efficient so its fine
while let Err(err) = self.connect(block_sender, chain_id, db_conn).await {
// thread_rng is crytographically secure. we don't need that here
sleep_ms = min(
cap_ms,
thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)),
@ -352,56 +365,120 @@ impl Web3Connection {
sleep(retry_in).await;
}
info!("connected to {}", self);
Ok(())
}
/// reconnect a websocket provider
pub async fn reconnect(
/// connect to the web3 provider
async fn connect(
self: &Arc<Self>,
// websocket doesn't need the http client
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
// since this lock is held open over an await, we use tokio's locking
// TODO: timeout on this lock. if its slow, something is wrong
let mut provider_option = self.provider.write().await;
trace!("provider_state {} locking...", self);
let mut provider_state = self.provider_state.write().await;
trace!("provider_state {} locked: {:?}", self, provider_state);
if let Some(provider) = &*provider_option {
match provider.as_ref() {
Web3Provider::Http(_) => {
// http clients don't need to do anything for reconnecting
// they *do* need to run this function to setup the first
match &*provider_state {
ProviderState::None => {
info!("connecting to {}", self);
}
ProviderState::NotReady(provider) | ProviderState::Ready(provider) => {
// disconnect the current provider
if let Web3Provider::Mock = provider.as_ref() {
return Ok(());
}
Web3Provider::Ws(_) => {}
Web3Provider::Mock => return Ok(()),
trace!("Reconnecting to {}", self);
// disconnect the current provider
*provider_state = ProviderState::None;
// reset sync status
trace!("locking head block on {}", self);
{
let mut head_block = self.head_block.write();
*head_block = None;
}
trace!("done with head block on {}", self);
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
}
info!("Reconnecting to {}", self);
// disconnect the current provider
*provider_option = None;
// reset sync status
{
let mut head_block_id = self.head_block.write();
*head_block_id = None;
}
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = &block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
} else {
info!("connecting to {}", self);
}
trace!("Creating new Web3Provider on {}", self);
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
*provider_option = Some(Arc::new(new_provider));
// TODO: if an error happens,
*provider_state = ProviderState::NotReady(Arc::new(new_provider));
// drop the lock so that we can get a request handle
trace!("provider_state {} unlocked", self);
drop(provider_state);
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30), true)
.await?
.request(
"eth_chainId",
&json!(Option::None::<()>),
Level::Trace.into(),
)
.await;
trace!("found_chain_id: {:?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
}
}
Err(e) => {
return Err(anyhow::Error::from(e));
}
}
// we could take "archive" as a parameter, but we would want a safety check on it regardless
// check common archive thresholds
// TODO: would be great if rpcs exposed this
// TODO: move this to a helper function so we can recheck on errors or as the chain grows
// TODO: move this to a helper function that checks
self.check_block_data_limit(&authorization).await?;
{
let mut provider_state = self.provider_state.write().await;
// TODO: do this without a clone
let ready_provider = provider_state
.provider(true)
.await
.context("provider missing")?
.clone();
*provider_state = ProviderState::Ready(ready_provider);
}
info!("successfully connected to {}", self);
@ -413,11 +490,6 @@ impl Web3Connection {
self.active_requests.load(atomic::Ordering::Acquire)
}
#[inline]
pub async fn has_provider(&self) -> bool {
self.provider.read().await.is_some()
}
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
@ -483,22 +555,82 @@ impl Web3Connection {
}
/// subscribe to blocks and transactions with automatic reconnects
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
#[allow(clippy::too_many_arguments)]
async fn subscribe(
self: Arc<Self>,
authorization: &Arc<Authorization>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
chain_id: u64,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
reconnect: bool,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
loop {
debug!("subscribing to {}", self);
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
let mut futures = vec![];
{
// health check
// TODO: move this into a proper function
let authorization = authorization.clone();
let block_sender = block_sender.clone();
let conn = self.clone();
let (ready_tx, ready_rx) = oneshot::channel();
let f = async move {
// initial sleep to allow for the initial connection
conn.retrying_connect(
block_sender.as_ref(),
chain_id,
authorization.db_conn.as_ref(),
false,
)
.await?;
// provider is ready
ready_tx.send(()).unwrap();
// wait before doing the initial health check
// TODO: how often?
let health_sleep_seconds = 10;
sleep(Duration::from_secs(health_sleep_seconds)).await;
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
trace!("health check on {}", conn);
if conn
.provider_state
.read()
.await
.provider(false)
.await
.is_none()
{
// returning error will trigger a reconnect
return Err(anyhow::anyhow!("{} is not ready", conn));
}
if let Some(x) = &*conn.head_block.read() {
// if this block is too old, return an error so we reconnect
if x.lag() > 0 {
return Err(anyhow::anyhow!("provider is lagged"));
}
}
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
};
futures.push(flatten_handle(tokio::spawn(f)));
// wait on the initial connection
ready_rx.await?;
}
if let Some(block_sender) = &block_sender {
let f = self.clone().subscribe_new_heads(
authorization.clone(),
@ -518,32 +650,6 @@ impl Web3Connection {
futures.push(flatten_handle(tokio::spawn(f)));
}
{
// TODO: move this into a proper function
let conn = self.clone();
// health check
let f = async move {
loop {
if let Some(provider) = conn.provider.read().await.as_ref() {
if provider.ready() {
// // trace!(rpc=%conn, "provider is ready");
} else {
warn!("rpc {} is NOT ready", conn);
// returning error will trigger a reconnect
// TODO: what if we just happened to have this check line up with another restart?
return Err(anyhow::anyhow!("provider is not ready"));
}
}
// TODO: how often?
// TODO: also check that the head block has changed recently
sleep(Duration::from_secs(10)).await;
}
};
futures.push(flatten_handle(tokio::spawn(f)));
}
match try_join_all(futures).await {
Ok(_) => {
// futures all exited without error. break instead of restarting subscriptions
@ -551,9 +657,16 @@ impl Web3Connection {
}
Err(err) => {
if reconnect {
warn!("{} subscription exited. err={:?}", self, err);
warn!("{} connected ended. err={:?}", self, err);
self.retrying_reconnect(block_sender.as_ref(), true).await?;
self.clone()
.retrying_connect(
block_sender.as_ref(),
chain_id,
authorization.db_conn.as_ref(),
true,
)
.await?;
} else {
error!("{} subscription exited. err={:?}", self, err);
return Err(err);
@ -575,11 +688,10 @@ impl Web3Connection {
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
) -> anyhow::Result<()> {
info!("watching new heads on {}", self);
trace!("watching new heads on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
if let ProviderState::Ready(provider) = &*self.provider_state.read().await {
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
@ -592,7 +704,7 @@ impl Web3Connection {
loop {
// TODO: what should the max_wait be?
match self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.await
{
Ok(active_request_handle) => {
@ -673,14 +785,12 @@ impl Web3Connection {
}
}
}
// // trace!(rpc=%self, "ok http interval");
}
}
Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
@ -690,7 +800,7 @@ impl Web3Connection {
// all it does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.await?
.request(
"eth_getBlockByNumber",
@ -737,12 +847,14 @@ impl Web3Connection {
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!("new_heads subscription to {} ended", self);
return Err(anyhow::anyhow!("new_heads subscription ended"));
Err(anyhow::anyhow!("new_heads subscription ended"))
}
}
} else {
Err(anyhow::anyhow!(
"Provider not ready! Unable to subscribe to heads"
))
}
Ok(())
}
async fn subscribe_pending_transactions(
@ -750,11 +862,9 @@ impl Web3Connection {
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("watching pending transactions on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
if let ProviderState::Ready(provider) = &*self.provider_state.read().await {
trace!("watching pending transactions on {}", self);
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
@ -786,7 +896,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30), false)
.await;
let mut stream = provider.subscribe_pending_txs().await?;
@ -808,26 +918,31 @@ impl Web3Connection {
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
}
} else {
warn!(
"Provider not ready! Unable to watch pending transactions on {}",
self
);
}
Ok(())
}
/// be careful with this; it might wait forever!
/// `allow_not_ready` is only for use by health checks while starting the provider
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
max_wait: Duration,
allow_not_ready: bool,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait;
loop {
let x = self.try_request_handle(authorization).await;
// // trace!(?x, "try_request_handle");
match x {
match self
.try_request_handle(authorization, allow_not_ready)
.await
{
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
@ -840,7 +955,7 @@ impl Web3Connection {
}
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::NotSynced) => {
Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
// TODO: sleep how long? maybe just error?
@ -855,12 +970,19 @@ impl Web3Connection {
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
allow_not_ready: bool,
) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
if self
.provider_state
.read()
.await
.provider(allow_not_ready)
.await
.is_none()
{
// TODO: emit a stat?
// TODO: wait until we have a provider?
return Ok(OpenRequestResult::NotSynced);
return Ok(OpenRequestResult::NotReady);
}
// check rate limits
@ -880,7 +1002,7 @@ impl Web3Connection {
return Ok(OpenRequestResult::RetryAt(retry_at));
}
RedisRateLimitResult::RetryNever => {
return Ok(OpenRequestResult::NotSynced);
return Ok(OpenRequestResult::NotReady);
}
}
};
@ -1027,9 +1149,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(None),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1072,9 +1195,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(None),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1121,9 +1245,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(None),
provider_state: AsyncRwLock::new(ProviderState::None),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block: RwLock::new(Some(head_block.clone())),

@ -154,7 +154,7 @@ impl Web3Connections {
let mut connections = HashMap::new();
let mut handles = vec![];
// TODO: futures unordered?
// TODO: do we need to join this?
for x in join_all(spawn_handles).await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
@ -405,14 +405,14 @@ impl Web3Connections {
);
// TODO: what should happen here? automatic retry?
// TODO: more detailed error
return Ok(OpenRequestResult::NotSynced);
return Ok(OpenRequestResult::NotReady);
}
1 => {
let rpc = usable_rpcs.get(0).expect("len is 1");
// TODO: try or wait for a request handle?
let handle = rpc
.wait_for_request_handle(authorization, Duration::from_secs(60))
.wait_for_request_handle(authorization, Duration::from_secs(60), false)
.await?;
return Ok(OpenRequestResult::Handle(handle));
@ -486,7 +486,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in sorted_rpcs.iter() {
// increment our connection counter
match rpc.try_request_handle(authorization).await {
match rpc.try_request_handle(authorization, false).await {
Ok(OpenRequestResult::Handle(handle)) => {
// // trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle));
@ -494,7 +494,7 @@ impl Web3Connections {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::NotSynced) => {
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning?
}
Err(err) => {
@ -516,7 +516,7 @@ impl Web3Connections {
let handle = sorted_rpcs
.get(0)
.expect("at least 1 is available")
.wait_for_request_handle(authorization, Duration::from_secs(3))
.wait_for_request_handle(authorization, Duration::from_secs(3), false)
.await?;
Ok(OpenRequestResult::Handle(handle))
@ -553,13 +553,13 @@ impl Web3Connections {
}
// check rate limits and increment our connection counter
match connection.try_request_handle(authorization).await {
match connection.try_request_handle(authorization, false).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle),
Ok(OpenRequestResult::NotSynced) => {
Ok(OpenRequestResult::NotReady) => {
warn!("no request handle for {}", connection)
}
Err(err) => {
@ -682,7 +682,7 @@ impl Web3Connections {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!("All rate limits exceeded. Sleeping untill {:?}", retry_at);
warn!("All rate limits exceeded. Sleeping until {:?}", retry_at);
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
@ -693,15 +693,12 @@ impl Web3Connections {
continue;
}
OpenRequestResult::NotSynced => {
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep(Duration::from_millis(200)).await;
continue;
break;
}
}
}
@ -832,7 +829,7 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use super::*;
use crate::rpcs::{blockchain::SavedBlock, provider::Web3Provider};
use crate::rpcs::{blockchain::SavedBlock, connection::ProviderState, provider::Web3Provider};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
@ -886,9 +883,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: true,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block: RwLock::new(Some(head_block.clone())),
@ -903,9 +901,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block: RwLock::new(Some(lagged_block.clone())),
@ -993,7 +992,7 @@ mod tests {
dbg!(&x);
assert!(matches!(x, OpenRequestResult::NotSynced));
assert!(matches!(x, OpenRequestResult::NotReady));
// add lagged blocks to the conns. both servers should be allowed
conns.save_block(&lagged_block.block, true).await.unwrap();
@ -1066,7 +1065,7 @@ mod tests {
conns
.best_synced_backend_connection(&authorization, None, &[], Some(&2.into()))
.await,
Ok(OpenRequestResult::NotSynced)
Ok(OpenRequestResult::NotReady)
));
}
@ -1103,9 +1102,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 3_000,
automatic_block_limit: false,
block_data_limit: 64.into(),
weight: 1.0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1120,9 +1120,10 @@ mod tests {
active_requests: 0.into(),
frontend_requests: 0.into(),
internal_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
provider_state: AsyncRwLock::new(ProviderState::Ready(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: u64::MAX.into(),
// TODO: does weight = 0 work?
weight: 0.01,

@ -14,7 +14,6 @@ pub enum Web3Provider {
impl Web3Provider {
pub fn ready(&self) -> bool {
// TODO: i'm not sure if this is enough
match self {
Self::Mock => true,
Self::Http(_) => true,

@ -8,7 +8,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes};
use log::{debug, error, trace, warn, Level};
use log::{debug, error, info, trace, warn, Level};
use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
@ -27,7 +27,7 @@ pub enum OpenRequestResult {
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request because the server is not synced
NotSynced,
NotReady,
}
/// Make RPC requests through this handle and drop it when you are done.
@ -194,29 +194,47 @@ impl OpenRequestHandle {
// trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
let mut logged = false;
while provider.is_none() {
match self.conn.provider.read().await.clone() {
let ready_provider = self
.conn
.provider_state
.read()
.await
// TODO: hard code true, or take a bool in the `new` function?
.provider(true)
.await
.cloned();
match ready_provider {
None => {
warn!("no provider for {}!", self.conn);
if !logged {
logged = true;
warn!("no provider for {}!", self.conn);
}
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
sleep(Duration::from_millis(100)).await
}
Some(found_provider) => provider = Some(found_provider),
Some(x) => provider = Some(x),
}
}
let provider = &*provider.expect("provider was checked already");
let provider = provider.expect("provider was checked already");
// trace!("got provider for {:?}", self);
// TODO: really sucks that we have to clone here
let response = match provider {
let response = match &*provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};
// trace!("got response for {:?}: {:?}", self, response);
if let Err(err) = &response {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
@ -255,7 +273,7 @@ impl OpenRequestHandle {
// check for "execution reverted" here
let is_revert = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match provider {
let msg = match &*provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =

@ -28,7 +28,7 @@ impl Web3Connections {
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_request_handle(authorization).await {
let tx: Transaction = match rpc.try_request_handle(authorization, false).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(

@ -22,7 +22,7 @@ pub async fn get_user_id_from_params(
// this is a long type. should we strip it down?
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
) -> Result<u64, FrontendErrorResponse> {
match (bearer, params.get("user_id")) {
(Some(TypedHeader(Authorization(bearer))), Some(user_id)) => {
// check for the bearer cache key
@ -38,8 +38,7 @@ pub async fn get_user_id_from_params(
let user_id: u64 = user_id.parse().context("Parsing user_id param")?;
if bearer_user_id != user_id {
// TODO: proper HTTP Status code
Err(anyhow::anyhow!("permission denied"))
Err(FrontendErrorResponse::AccessDenied)
} else {
Ok(bearer_user_id)
}
@ -49,13 +48,12 @@ pub async fn get_user_id_from_params(
// 0 means all
Ok(0)
}
(None, Some(x)) => {
(None, Some(_)) => {
// they do not have a bearer token, but requested a specific id. block
// TODO: proper error code from a useful error code
// TODO: maybe instead of this sharp edged warn, we have a config value?
// TODO: check config for if we should deny or allow this
Err(anyhow::anyhow!("permission denied"))
Err(FrontendErrorResponse::AccessDenied)
// // TODO: make this a flag
// warn!("allowing without auth during development!");
// Ok(x.parse()?)