diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 19b07ac7..2d6c4736 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,7 +1,7 @@ ///! Rate-limited communication with a web3 provider. use super::provider::Web3Provider; -use super::request::PendingRequestHandle; -use super::request::RequestHandleResult; +use super::request::OpenRequestHandle; +use super::request::OpenRequestResult; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; @@ -18,8 +18,8 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; -use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; -use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; +use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; +use tracing::{error, info, instrument, trace, warn}; /// An active connection to a Web3Rpc pub struct Web3Connection { @@ -372,7 +372,7 @@ impl Web3Connection { loop { match self.try_request_handle().await { - Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => { + Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; @@ -394,12 +394,12 @@ impl Web3Connection { self.clone().send_block_result(block, &block_sender).await?; } } - Ok(RequestHandleResult::RetryAt(retry_at)) => { + Ok(OpenRequestResult::RetryAt(retry_at)) => { warn!(?retry_at, "Rate limited on latest block from {}", self); sleep_until(retry_at).await; continue; } - Ok(RequestHandleResult::None) => { + Ok(OpenRequestResult::None) => { warn!("No handle for latest block from {}", self); // TODO: what should we do? } @@ -521,17 +521,17 @@ impl Web3Connection { /// be careful with this; it will wait forever! #[instrument(skip_all)] - pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { + pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { match self.try_request_handle().await { - Ok(RequestHandleResult::ActiveRequest(handle)) => return Ok(handle), - Ok(RequestHandleResult::RetryAt(retry_at)) => { + Ok(OpenRequestResult::ActiveRequest(handle)) => return Ok(handle), + Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? sleep_until(retry_at).await; } - Ok(RequestHandleResult::None) => { + Ok(OpenRequestResult::None) => { // TODO: when can this happen? emit a stat? // TODO: instead of erroring, subscribe to the head block on this // TODO: sleep how long? maybe just error? @@ -543,11 +543,11 @@ impl Web3Connection { } } - pub async fn try_request_handle(self: &Arc) -> anyhow::Result { + pub async fn try_request_handle(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? - return Ok(RequestHandleResult::None); + return Ok(OpenRequestResult::None); } // check rate limits @@ -563,7 +563,7 @@ impl Web3Connection { // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? warn!(?retry_at, ?self, "Exhausted rate limit"); - return Ok(RequestHandleResult::RetryAt(retry_at.into())); + return Ok(OpenRequestResult::RetryAt(retry_at.into())); } Ok(ThrottleResult::RetryNever) => { return Err(anyhow::anyhow!("Rate limit failed.")); @@ -574,9 +574,9 @@ impl Web3Connection { } }; - let handle = PendingRequestHandle::new(self.clone()); + let handle = OpenRequestHandle::new(self.clone()); - Ok(RequestHandleResult::ActiveRequest(handle)) + Ok(OpenRequestResult::ActiveRequest(handle)) } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 4ca29c78..12b336a5 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,6 +1,6 @@ ///! Load balanced communication with a group of web3 providers use super::connection::Web3Connection; -use super::request::{PendingRequestHandle, RequestHandleResult}; +use super::request::{OpenRequestHandle, OpenRequestResult}; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; @@ -190,7 +190,7 @@ impl Web3Connections { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: maximum wait time let pending_transaction: Transaction = match rpc.try_request_handle().await { - Ok(RequestHandleResult::ActiveRequest(handle)) => { + Ok(OpenRequestResult::ActiveRequest(handle)) => { handle .request("eth_getTransactionByHash", (pending_tx_id,)) .await? @@ -349,7 +349,7 @@ impl Web3Connections { #[instrument(skip_all)] pub async fn try_send_parallel_requests( &self, - active_request_handles: Vec, + active_request_handles: Vec, method: &str, // TODO: remove this box once i figure out how to do the options params: Option<&serde_json::Value>, @@ -437,7 +437,7 @@ impl Web3Connections { &self, skip: &[Arc], min_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> anyhow::Result { let mut earliest_retry_at = None; // filter the synced rpcs @@ -494,14 +494,14 @@ impl Web3Connections { for rpc in synced_rpcs.into_iter() { // increment our connection counter match rpc.try_request_handle().await { - Ok(RequestHandleResult::ActiveRequest(handle)) => { + Ok(OpenRequestResult::ActiveRequest(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); - return Ok(RequestHandleResult::ActiveRequest(handle)); + return Ok(OpenRequestResult::ActiveRequest(handle)); } - Ok(RequestHandleResult::RetryAt(retry_at)) => { + Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(RequestHandleResult::None) => { + Ok(OpenRequestResult::None) => { // TODO: log a warning? } Err(err) => { @@ -515,7 +515,7 @@ impl Web3Connections { match earliest_retry_at { None => todo!(), - Some(earliest_retry_at) => Ok(RequestHandleResult::RetryAt(earliest_retry_at)), + Some(earliest_retry_at) => Ok(OpenRequestResult::RetryAt(earliest_retry_at)), } } @@ -525,7 +525,7 @@ impl Web3Connections { pub async fn upstream_servers( &self, min_block_needed: Option<&U64>, - ) -> Result, Option> { + ) -> Result, Option> { let mut earliest_retry_at = None; // TODO: with capacity? let mut selected_rpcs = vec![]; @@ -539,12 +539,12 @@ impl Web3Connections { // check rate limits and increment our connection counter match connection.try_request_handle().await { - Ok(RequestHandleResult::RetryAt(retry_at)) => { + Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(RequestHandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle), - Ok(RequestHandleResult::None) => { + Ok(OpenRequestResult::ActiveRequest(handle)) => selected_rpcs.push(handle), + Ok(OpenRequestResult::None) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -578,7 +578,7 @@ impl Web3Connections { .next_upstream_server(&skip_rpcs, min_block_needed) .await { - Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => { + Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); @@ -631,7 +631,7 @@ impl Web3Connections { } } } - Ok(RequestHandleResult::RetryAt(retry_at)) => { + Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting @@ -641,7 +641,7 @@ impl Web3Connections { continue; } - Ok(RequestHandleResult::None) => { + Ok(OpenRequestResult::None) => { warn!(?self, "No server handles!"); // TODO: subscribe to something on synced connections. maybe it should just be a watch channel diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 4fde6ded..d67a2656 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,35 +1,22 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; -use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::BlockAndRpc; -use anyhow::Context; -use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; -use futures::future::try_join_all; -use futures::StreamExt; -use parking_lot::RwLock; -use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult}; -use serde::ser::{SerializeStruct, Serializer}; -use serde::Serialize; use std::fmt; -use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU32, AtomicU64}; -use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::broadcast; -use tokio::sync::RwLock as AsyncRwLock; -use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; -use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; +use std::sync::atomic; +use std::sync::Arc; +use tokio::time::Instant; +use tracing::{instrument, trace}; // TODO: rename this -pub enum RequestHandleResult { - ActiveRequest(PendingRequestHandle), +pub enum OpenRequestResult { + ActiveRequest(OpenRequestHandle), RetryAt(Instant), None, } /// Drop this once a connection completes -pub struct PendingRequestHandle(Arc); +pub struct OpenRequestHandle(Arc); -impl PendingRequestHandle { +impl OpenRequestHandle { pub fn new(connection: Arc) -> Self { // TODO: attach a unique id to this? // TODO: what ordering?! @@ -86,7 +73,7 @@ impl PendingRequestHandle { } } -impl Drop for PendingRequestHandle { +impl Drop for OpenRequestHandle { fn drop(&mut self) { self.0 .active_requests