rename and lints
This commit is contained in:
parent
2cc1b8e297
commit
59eb9a889f
@ -1,7 +1,7 @@
|
||||
///! Rate-limited communication with a web3 provider.
|
||||
use super::provider::Web3Provider;
|
||||
use super::request::PendingRequestHandle;
|
||||
use super::request::RequestHandleResult;
|
||||
use super::request::OpenRequestHandle;
|
||||
use super::request::OpenRequestResult;
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
use crate::config::BlockAndRpc;
|
||||
use anyhow::Context;
|
||||
@ -18,8 +18,8 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::RwLock as AsyncRwLock;
|
||||
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
||||
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
|
||||
/// An active connection to a Web3Rpc
|
||||
pub struct Web3Connection {
|
||||
@ -372,7 +372,7 @@ impl Web3Connection {
|
||||
|
||||
loop {
|
||||
match self.try_request_handle().await {
|
||||
Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => {
|
||||
let block: Result<Block<TxHash>, _> = active_request_handle
|
||||
.request("eth_getBlockByNumber", ("latest", false))
|
||||
.await;
|
||||
@ -394,12 +394,12 @@ impl Web3Connection {
|
||||
self.clone().send_block_result(block, &block_sender).await?;
|
||||
}
|
||||
}
|
||||
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
warn!(?retry_at, "Rate limited on latest block from {}", self);
|
||||
sleep_until(retry_at).await;
|
||||
continue;
|
||||
}
|
||||
Ok(RequestHandleResult::None) => {
|
||||
Ok(OpenRequestResult::None) => {
|
||||
warn!("No handle for latest block from {}", self);
|
||||
// TODO: what should we do?
|
||||
}
|
||||
@ -521,17 +521,17 @@ impl Web3Connection {
|
||||
|
||||
/// be careful with this; it will wait forever!
|
||||
#[instrument(skip_all)]
|
||||
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<PendingRequestHandle> {
|
||||
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestHandle> {
|
||||
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
|
||||
|
||||
loop {
|
||||
match self.try_request_handle().await {
|
||||
Ok(RequestHandleResult::ActiveRequest(handle)) => return Ok(handle),
|
||||
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(handle)) => return Ok(handle),
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
// TODO: emit a stat?
|
||||
sleep_until(retry_at).await;
|
||||
}
|
||||
Ok(RequestHandleResult::None) => {
|
||||
Ok(OpenRequestResult::None) => {
|
||||
// TODO: when can this happen? emit a stat?
|
||||
// TODO: instead of erroring, subscribe to the head block on this
|
||||
// TODO: sleep how long? maybe just error?
|
||||
@ -543,11 +543,11 @@ impl Web3Connection {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<RequestHandleResult> {
|
||||
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
|
||||
// check that we are connected
|
||||
if !self.has_provider().await {
|
||||
// TODO: emit a stat?
|
||||
return Ok(RequestHandleResult::None);
|
||||
return Ok(OpenRequestResult::None);
|
||||
}
|
||||
|
||||
// check rate limits
|
||||
@ -563,7 +563,7 @@ impl Web3Connection {
|
||||
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
|
||||
warn!(?retry_at, ?self, "Exhausted rate limit");
|
||||
|
||||
return Ok(RequestHandleResult::RetryAt(retry_at.into()));
|
||||
return Ok(OpenRequestResult::RetryAt(retry_at.into()));
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
return Err(anyhow::anyhow!("Rate limit failed."));
|
||||
@ -574,9 +574,9 @@ impl Web3Connection {
|
||||
}
|
||||
};
|
||||
|
||||
let handle = PendingRequestHandle::new(self.clone());
|
||||
let handle = OpenRequestHandle::new(self.clone());
|
||||
|
||||
Ok(RequestHandleResult::ActiveRequest(handle))
|
||||
Ok(OpenRequestResult::ActiveRequest(handle))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
///! Load balanced communication with a group of web3 providers
|
||||
use super::connection::Web3Connection;
|
||||
use super::request::{PendingRequestHandle, RequestHandleResult};
|
||||
use super::request::{OpenRequestHandle, OpenRequestResult};
|
||||
use super::synced_connections::SyncedConnections;
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
|
||||
use crate::config::Web3ConnectionConfig;
|
||||
@ -190,7 +190,7 @@ impl Web3Connections {
|
||||
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
|
||||
// TODO: maximum wait time
|
||||
let pending_transaction: Transaction = match rpc.try_request_handle().await {
|
||||
Ok(RequestHandleResult::ActiveRequest(handle)) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(handle)) => {
|
||||
handle
|
||||
.request("eth_getTransactionByHash", (pending_tx_id,))
|
||||
.await?
|
||||
@ -349,7 +349,7 @@ impl Web3Connections {
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_send_parallel_requests(
|
||||
&self,
|
||||
active_request_handles: Vec<PendingRequestHandle>,
|
||||
active_request_handles: Vec<OpenRequestHandle>,
|
||||
method: &str,
|
||||
// TODO: remove this box once i figure out how to do the options
|
||||
params: Option<&serde_json::Value>,
|
||||
@ -437,7 +437,7 @@ impl Web3Connections {
|
||||
&self,
|
||||
skip: &[Arc<Web3Connection>],
|
||||
min_block_needed: Option<&U64>,
|
||||
) -> anyhow::Result<RequestHandleResult> {
|
||||
) -> anyhow::Result<OpenRequestResult> {
|
||||
let mut earliest_retry_at = None;
|
||||
|
||||
// filter the synced rpcs
|
||||
@ -494,14 +494,14 @@ impl Web3Connections {
|
||||
for rpc in synced_rpcs.into_iter() {
|
||||
// increment our connection counter
|
||||
match rpc.try_request_handle().await {
|
||||
Ok(RequestHandleResult::ActiveRequest(handle)) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(handle)) => {
|
||||
trace!("next server on {:?}: {:?}", self, rpc);
|
||||
return Ok(RequestHandleResult::ActiveRequest(handle));
|
||||
return Ok(OpenRequestResult::ActiveRequest(handle));
|
||||
}
|
||||
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
}
|
||||
Ok(RequestHandleResult::None) => {
|
||||
Ok(OpenRequestResult::None) => {
|
||||
// TODO: log a warning?
|
||||
}
|
||||
Err(err) => {
|
||||
@ -515,7 +515,7 @@ impl Web3Connections {
|
||||
|
||||
match earliest_retry_at {
|
||||
None => todo!(),
|
||||
Some(earliest_retry_at) => Ok(RequestHandleResult::RetryAt(earliest_retry_at)),
|
||||
Some(earliest_retry_at) => Ok(OpenRequestResult::RetryAt(earliest_retry_at)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -525,7 +525,7 @@ impl Web3Connections {
|
||||
pub async fn upstream_servers(
|
||||
&self,
|
||||
min_block_needed: Option<&U64>,
|
||||
) -> Result<Vec<PendingRequestHandle>, Option<Instant>> {
|
||||
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
|
||||
let mut earliest_retry_at = None;
|
||||
// TODO: with capacity?
|
||||
let mut selected_rpcs = vec![];
|
||||
@ -539,12 +539,12 @@ impl Web3Connections {
|
||||
|
||||
// check rate limits and increment our connection counter
|
||||
match connection.try_request_handle().await {
|
||||
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
// this rpc is not available. skip it
|
||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||
}
|
||||
Ok(RequestHandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
|
||||
Ok(RequestHandleResult::None) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
|
||||
Ok(OpenRequestResult::None) => {
|
||||
warn!("no request handle for {}", connection)
|
||||
}
|
||||
Err(err) => {
|
||||
@ -578,7 +578,7 @@ impl Web3Connections {
|
||||
.next_upstream_server(&skip_rpcs, min_block_needed)
|
||||
.await
|
||||
{
|
||||
Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => {
|
||||
Ok(OpenRequestResult::ActiveRequest(active_request_handle)) => {
|
||||
// save the rpc in case we get an error and want to retry on another server
|
||||
skip_rpcs.push(active_request_handle.clone_connection());
|
||||
|
||||
@ -631,7 +631,7 @@ impl Web3Connections {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||
@ -641,7 +641,7 @@ impl Web3Connections {
|
||||
|
||||
continue;
|
||||
}
|
||||
Ok(RequestHandleResult::None) => {
|
||||
Ok(OpenRequestResult::None) => {
|
||||
warn!(?self, "No server handles!");
|
||||
|
||||
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
|
||||
|
@ -1,35 +1,22 @@
|
||||
use super::connection::Web3Connection;
|
||||
use super::provider::Web3Provider;
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
use crate::config::BlockAndRpc;
|
||||
use anyhow::Context;
|
||||
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult};
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::RwLock as AsyncRwLock;
|
||||
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
||||
use std::sync::atomic;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
// TODO: rename this
|
||||
pub enum RequestHandleResult {
|
||||
ActiveRequest(PendingRequestHandle),
|
||||
pub enum OpenRequestResult {
|
||||
ActiveRequest(OpenRequestHandle),
|
||||
RetryAt(Instant),
|
||||
None,
|
||||
}
|
||||
|
||||
/// Drop this once a connection completes
|
||||
pub struct PendingRequestHandle(Arc<Web3Connection>);
|
||||
pub struct OpenRequestHandle(Arc<Web3Connection>);
|
||||
|
||||
impl PendingRequestHandle {
|
||||
impl OpenRequestHandle {
|
||||
pub fn new(connection: Arc<Web3Connection>) -> Self {
|
||||
// TODO: attach a unique id to this?
|
||||
// TODO: what ordering?!
|
||||
@ -86,7 +73,7 @@ impl PendingRequestHandle {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PendingRequestHandle {
|
||||
impl Drop for OpenRequestHandle {
|
||||
fn drop(&mut self) {
|
||||
self.0
|
||||
.active_requests
|
||||
|
Loading…
Reference in New Issue
Block a user