even more files
This commit is contained in:
parent
4d357977e9
commit
2cc1b8e297
@ -41,7 +41,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
|
|||||||
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
|
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
|
||||||
use crate::jsonrpc::JsonRpcRequest;
|
use crate::jsonrpc::JsonRpcRequest;
|
||||||
use crate::jsonrpc::JsonRpcRequestEnum;
|
use crate::jsonrpc::JsonRpcRequestEnum;
|
||||||
use crate::rpcs::Web3Connections;
|
use crate::rpcs::connections::Web3Connections;
|
||||||
use crate::stats::AppStats;
|
use crate::stats::AppStats;
|
||||||
|
|
||||||
// TODO: make this customizable?
|
// TODO: make this customizable?
|
||||||
|
@ -7,7 +7,7 @@ use std::sync::Arc;
|
|||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use crate::app::AnyhowJoinHandle;
|
use crate::app::AnyhowJoinHandle;
|
||||||
use crate::rpcs::Web3Connection;
|
use crate::rpcs::connection::Web3Connection;
|
||||||
|
|
||||||
pub type BlockAndRpc = (Arc<Block<TxHash>>, Arc<Web3Connection>);
|
pub type BlockAndRpc = (Arc<Block<TxHash>>, Arc<Web3Connection>);
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
///! Keep track of the blockchain as seen by a Web3Connections.
|
///! Keep track of the blockchain as seen by a Web3Connections.
|
||||||
use super::SyncedConnections;
|
use super::connection::Web3Connection;
|
||||||
use super::Web3Connection;
|
use super::connections::Web3Connections;
|
||||||
use super::Web3Connections;
|
use super::synced_connections::SyncedConnections;
|
||||||
use crate::app::TxState;
|
use crate::app::TxState;
|
||||||
use crate::jsonrpc::JsonRpcRequest;
|
use crate::jsonrpc::JsonRpcRequest;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
///! Rate-limited communication with a web3 provider
|
///! Rate-limited communication with a web3 provider.
|
||||||
use super::provider::Web3Provider;
|
use super::provider::Web3Provider;
|
||||||
|
use super::request::PendingRequestHandle;
|
||||||
|
use super::request::RequestHandleResult;
|
||||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||||
use crate::config::BlockAndRpc;
|
use crate::config::BlockAndRpc;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
@ -19,23 +21,16 @@ use tokio::sync::RwLock as AsyncRwLock;
|
|||||||
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||||
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
||||||
|
|
||||||
// TODO: rename this
|
|
||||||
pub enum HandleResult {
|
|
||||||
ActiveRequest(ActiveRequestHandle),
|
|
||||||
RetryAt(Instant),
|
|
||||||
None,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An active connection to a Web3Rpc
|
/// An active connection to a Web3Rpc
|
||||||
pub struct Web3Connection {
|
pub struct Web3Connection {
|
||||||
name: String,
|
pub name: String,
|
||||||
/// TODO: can we get this from the provider? do we even need it?
|
/// TODO: can we get this from the provider? do we even need it?
|
||||||
pub url: String,
|
pub url: String,
|
||||||
/// keep track of currently open requests. We sort on this
|
/// keep track of currently open requests. We sort on this
|
||||||
active_requests: AtomicU32,
|
pub(super) active_requests: AtomicU32,
|
||||||
/// provider is in a RwLock so that we can replace it if re-connecting
|
/// provider is in a RwLock so that we can replace it if re-connecting
|
||||||
/// it is an async lock because we hold it open across awaits
|
/// it is an async lock because we hold it open across awaits
|
||||||
provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
||||||
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
||||||
hard_limit: Option<RedisRateLimit>,
|
hard_limit: Option<RedisRateLimit>,
|
||||||
/// used for load balancing to the least loaded server
|
/// used for load balancing to the least loaded server
|
||||||
@ -45,47 +40,6 @@ pub struct Web3Connection {
|
|||||||
head_block: RwLock<(H256, U64)>,
|
head_block: RwLock<(H256, U64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop this once a connection completes
|
|
||||||
pub struct ActiveRequestHandle(Arc<Web3Connection>);
|
|
||||||
|
|
||||||
impl Web3Provider {
|
|
||||||
#[instrument]
|
|
||||||
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
|
|
||||||
let provider = if url_str.starts_with("http") {
|
|
||||||
let url: url::Url = url_str.parse()?;
|
|
||||||
|
|
||||||
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
|
||||||
|
|
||||||
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
|
||||||
|
|
||||||
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
|
||||||
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
|
|
||||||
ethers::providers::Provider::new(provider)
|
|
||||||
.interval(Duration::from_secs(13))
|
|
||||||
.into()
|
|
||||||
} else if url_str.starts_with("ws") {
|
|
||||||
let provider = ethers::providers::Ws::connect(url_str)
|
|
||||||
.instrument(info_span!("Web3Provider", url_str = url_str))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
|
||||||
// TODO: i don't think this interval matters
|
|
||||||
ethers::providers::Provider::new(provider).into()
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Web3Provider {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
|
|
||||||
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Web3Connection {
|
impl Web3Connection {
|
||||||
/// Connect to a web3 rpc
|
/// Connect to a web3 rpc
|
||||||
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
|
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
|
||||||
@ -418,7 +372,7 @@ impl Web3Connection {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.try_request_handle().await {
|
match self.try_request_handle().await {
|
||||||
Ok(HandleResult::ActiveRequest(active_request_handle)) => {
|
Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => {
|
||||||
let block: Result<Block<TxHash>, _> = active_request_handle
|
let block: Result<Block<TxHash>, _> = active_request_handle
|
||||||
.request("eth_getBlockByNumber", ("latest", false))
|
.request("eth_getBlockByNumber", ("latest", false))
|
||||||
.await;
|
.await;
|
||||||
@ -440,12 +394,12 @@ impl Web3Connection {
|
|||||||
self.clone().send_block_result(block, &block_sender).await?;
|
self.clone().send_block_result(block, &block_sender).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(HandleResult::RetryAt(retry_at)) => {
|
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||||
warn!(?retry_at, "Rate limited on latest block from {}", self);
|
warn!(?retry_at, "Rate limited on latest block from {}", self);
|
||||||
sleep_until(retry_at).await;
|
sleep_until(retry_at).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Ok(HandleResult::None) => {
|
Ok(RequestHandleResult::None) => {
|
||||||
warn!("No handle for latest block from {}", self);
|
warn!("No handle for latest block from {}", self);
|
||||||
// TODO: what should we do?
|
// TODO: what should we do?
|
||||||
}
|
}
|
||||||
@ -567,17 +521,17 @@ impl Web3Connection {
|
|||||||
|
|
||||||
/// be careful with this; it will wait forever!
|
/// be careful with this; it will wait forever!
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<ActiveRequestHandle> {
|
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<PendingRequestHandle> {
|
||||||
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
|
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.try_request_handle().await {
|
match self.try_request_handle().await {
|
||||||
Ok(HandleResult::ActiveRequest(handle)) => return Ok(handle),
|
Ok(RequestHandleResult::ActiveRequest(handle)) => return Ok(handle),
|
||||||
Ok(HandleResult::RetryAt(retry_at)) => {
|
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||||
// TODO: emit a stat?
|
// TODO: emit a stat?
|
||||||
sleep_until(retry_at).await;
|
sleep_until(retry_at).await;
|
||||||
}
|
}
|
||||||
Ok(HandleResult::None) => {
|
Ok(RequestHandleResult::None) => {
|
||||||
// TODO: when can this happen? emit a stat?
|
// TODO: when can this happen? emit a stat?
|
||||||
// TODO: instead of erroring, subscribe to the head block on this
|
// TODO: instead of erroring, subscribe to the head block on this
|
||||||
// TODO: sleep how long? maybe just error?
|
// TODO: sleep how long? maybe just error?
|
||||||
@ -589,11 +543,11 @@ impl Web3Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<HandleResult> {
|
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<RequestHandleResult> {
|
||||||
// check that we are connected
|
// check that we are connected
|
||||||
if !self.has_provider().await {
|
if !self.has_provider().await {
|
||||||
// TODO: emit a stat?
|
// TODO: emit a stat?
|
||||||
return Ok(HandleResult::None);
|
return Ok(RequestHandleResult::None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check rate limits
|
// check rate limits
|
||||||
@ -609,7 +563,7 @@ impl Web3Connection {
|
|||||||
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
|
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
|
||||||
warn!(?retry_at, ?self, "Exhausted rate limit");
|
warn!(?retry_at, ?self, "Exhausted rate limit");
|
||||||
|
|
||||||
return Ok(HandleResult::RetryAt(retry_at.into()));
|
return Ok(RequestHandleResult::RetryAt(retry_at.into()));
|
||||||
}
|
}
|
||||||
Ok(ThrottleResult::RetryNever) => {
|
Ok(ThrottleResult::RetryNever) => {
|
||||||
return Err(anyhow::anyhow!("Rate limit failed."));
|
return Err(anyhow::anyhow!("Rate limit failed."));
|
||||||
@ -620,9 +574,16 @@ impl Web3Connection {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let handle = ActiveRequestHandle::new(self.clone());
|
let handle = PendingRequestHandle::new(self.clone());
|
||||||
|
|
||||||
Ok(HandleResult::ActiveRequest(handle))
|
Ok(RequestHandleResult::ActiveRequest(handle))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Web3Provider {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
|
||||||
|
f.debug_struct("Web3Provider").finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -633,71 +594,6 @@ impl Hash for Web3Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActiveRequestHandle {
|
|
||||||
fn new(connection: Arc<Web3Connection>) -> Self {
|
|
||||||
// TODO: attach a unique id to this?
|
|
||||||
// TODO: what ordering?!
|
|
||||||
connection
|
|
||||||
.active_requests
|
|
||||||
.fetch_add(1, atomic::Ordering::AcqRel);
|
|
||||||
|
|
||||||
Self(connection)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clone_connection(&self) -> Arc<Web3Connection> {
|
|
||||||
self.0.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a web3 request
|
|
||||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
|
||||||
/// By taking self here, we ensure that this is dropped after the request is complete
|
|
||||||
#[instrument(skip_all)]
|
|
||||||
pub async fn request<T, R>(
|
|
||||||
&self,
|
|
||||||
method: &str,
|
|
||||||
params: T,
|
|
||||||
) -> Result<R, ethers::prelude::ProviderError>
|
|
||||||
where
|
|
||||||
T: fmt::Debug + serde::Serialize + Send + Sync,
|
|
||||||
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
|
|
||||||
{
|
|
||||||
// TODO: use tracing spans properly
|
|
||||||
// TODO: it would be nice to have the request id on this
|
|
||||||
// TODO: including params in this is way too verbose
|
|
||||||
trace!("Sending {} to {}", method, self.0);
|
|
||||||
|
|
||||||
let mut provider = None;
|
|
||||||
|
|
||||||
while provider.is_none() {
|
|
||||||
// TODO: if no provider, don't unwrap. wait until there is one.
|
|
||||||
match self.0.provider.read().await.as_ref() {
|
|
||||||
None => {}
|
|
||||||
Some(found_provider) => provider = Some(found_provider.clone()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let response = match &*provider.unwrap() {
|
|
||||||
Web3Provider::Http(provider) => provider.request(method, params).await,
|
|
||||||
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
|
||||||
// TODO: at least instrument this with more useful information
|
|
||||||
// trace!("Reply from {}: {:?}", self.0, response);
|
|
||||||
trace!("Reply from {}", self.0);
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for ActiveRequestHandle {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.0
|
|
||||||
.active_requests
|
|
||||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Eq for Web3Connection {}
|
impl Eq for Web3Connection {}
|
||||||
|
|
||||||
impl Ord for Web3Connection {
|
impl Ord for Web3Connection {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
///! Load balanced communication with a group of web3 providers
|
///! Load balanced communication with a group of web3 providers
|
||||||
use super::SyncedConnections;
|
use super::connection::Web3Connection;
|
||||||
use super::{ActiveRequestHandle, HandleResult, Web3Connection};
|
use super::request::{PendingRequestHandle, RequestHandleResult};
|
||||||
|
use super::synced_connections::SyncedConnections;
|
||||||
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
|
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
|
||||||
use crate::config::Web3ConnectionConfig;
|
use crate::config::Web3ConnectionConfig;
|
||||||
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
|
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
|
||||||
@ -189,7 +190,7 @@ impl Web3Connections {
|
|||||||
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
|
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
|
||||||
// TODO: maximum wait time
|
// TODO: maximum wait time
|
||||||
let pending_transaction: Transaction = match rpc.try_request_handle().await {
|
let pending_transaction: Transaction = match rpc.try_request_handle().await {
|
||||||
Ok(HandleResult::ActiveRequest(handle)) => {
|
Ok(RequestHandleResult::ActiveRequest(handle)) => {
|
||||||
handle
|
handle
|
||||||
.request("eth_getTransactionByHash", (pending_tx_id,))
|
.request("eth_getTransactionByHash", (pending_tx_id,))
|
||||||
.await?
|
.await?
|
||||||
@ -348,7 +349,7 @@ impl Web3Connections {
|
|||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn try_send_parallel_requests(
|
pub async fn try_send_parallel_requests(
|
||||||
&self,
|
&self,
|
||||||
active_request_handles: Vec<ActiveRequestHandle>,
|
active_request_handles: Vec<PendingRequestHandle>,
|
||||||
method: &str,
|
method: &str,
|
||||||
// TODO: remove this box once i figure out how to do the options
|
// TODO: remove this box once i figure out how to do the options
|
||||||
params: Option<&serde_json::Value>,
|
params: Option<&serde_json::Value>,
|
||||||
@ -436,7 +437,7 @@ impl Web3Connections {
|
|||||||
&self,
|
&self,
|
||||||
skip: &[Arc<Web3Connection>],
|
skip: &[Arc<Web3Connection>],
|
||||||
min_block_needed: Option<&U64>,
|
min_block_needed: Option<&U64>,
|
||||||
) -> anyhow::Result<HandleResult> {
|
) -> anyhow::Result<RequestHandleResult> {
|
||||||
let mut earliest_retry_at = None;
|
let mut earliest_retry_at = None;
|
||||||
|
|
||||||
// filter the synced rpcs
|
// filter the synced rpcs
|
||||||
@ -493,14 +494,14 @@ impl Web3Connections {
|
|||||||
for rpc in synced_rpcs.into_iter() {
|
for rpc in synced_rpcs.into_iter() {
|
||||||
// increment our connection counter
|
// increment our connection counter
|
||||||
match rpc.try_request_handle().await {
|
match rpc.try_request_handle().await {
|
||||||
Ok(HandleResult::ActiveRequest(handle)) => {
|
Ok(RequestHandleResult::ActiveRequest(handle)) => {
|
||||||
trace!("next server on {:?}: {:?}", self, rpc);
|
trace!("next server on {:?}: {:?}", self, rpc);
|
||||||
return Ok(HandleResult::ActiveRequest(handle));
|
return Ok(RequestHandleResult::ActiveRequest(handle));
|
||||||
}
|
}
|
||||||
Ok(HandleResult::RetryAt(retry_at)) => {
|
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||||
}
|
}
|
||||||
Ok(HandleResult::None) => {
|
Ok(RequestHandleResult::None) => {
|
||||||
// TODO: log a warning?
|
// TODO: log a warning?
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -514,7 +515,7 @@ impl Web3Connections {
|
|||||||
|
|
||||||
match earliest_retry_at {
|
match earliest_retry_at {
|
||||||
None => todo!(),
|
None => todo!(),
|
||||||
Some(earliest_retry_at) => Ok(HandleResult::RetryAt(earliest_retry_at)),
|
Some(earliest_retry_at) => Ok(RequestHandleResult::RetryAt(earliest_retry_at)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -524,7 +525,7 @@ impl Web3Connections {
|
|||||||
pub async fn upstream_servers(
|
pub async fn upstream_servers(
|
||||||
&self,
|
&self,
|
||||||
min_block_needed: Option<&U64>,
|
min_block_needed: Option<&U64>,
|
||||||
) -> Result<Vec<ActiveRequestHandle>, Option<Instant>> {
|
) -> Result<Vec<PendingRequestHandle>, Option<Instant>> {
|
||||||
let mut earliest_retry_at = None;
|
let mut earliest_retry_at = None;
|
||||||
// TODO: with capacity?
|
// TODO: with capacity?
|
||||||
let mut selected_rpcs = vec![];
|
let mut selected_rpcs = vec![];
|
||||||
@ -538,12 +539,12 @@ impl Web3Connections {
|
|||||||
|
|
||||||
// check rate limits and increment our connection counter
|
// check rate limits and increment our connection counter
|
||||||
match connection.try_request_handle().await {
|
match connection.try_request_handle().await {
|
||||||
Ok(HandleResult::RetryAt(retry_at)) => {
|
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||||
// this rpc is not available. skip it
|
// this rpc is not available. skip it
|
||||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||||
}
|
}
|
||||||
Ok(HandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
|
Ok(RequestHandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
|
||||||
Ok(HandleResult::None) => {
|
Ok(RequestHandleResult::None) => {
|
||||||
warn!("no request handle for {}", connection)
|
warn!("no request handle for {}", connection)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -577,7 +578,7 @@ impl Web3Connections {
|
|||||||
.next_upstream_server(&skip_rpcs, min_block_needed)
|
.next_upstream_server(&skip_rpcs, min_block_needed)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(HandleResult::ActiveRequest(active_request_handle)) => {
|
Ok(RequestHandleResult::ActiveRequest(active_request_handle)) => {
|
||||||
// save the rpc in case we get an error and want to retry on another server
|
// save the rpc in case we get an error and want to retry on another server
|
||||||
skip_rpcs.push(active_request_handle.clone_connection());
|
skip_rpcs.push(active_request_handle.clone_connection());
|
||||||
|
|
||||||
@ -630,7 +631,7 @@ impl Web3Connections {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(HandleResult::RetryAt(retry_at)) => {
|
Ok(RequestHandleResult::RetryAt(retry_at)) => {
|
||||||
// TODO: move this to a helper function
|
// TODO: move this to a helper function
|
||||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||||
@ -640,7 +641,7 @@ impl Web3Connections {
|
|||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Ok(HandleResult::None) => {
|
Ok(RequestHandleResult::None) => {
|
||||||
warn!(?self, "No server handles!");
|
warn!(?self, "No server handles!");
|
||||||
|
|
||||||
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
|
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
mod blockchain;
|
pub mod blockchain;
|
||||||
mod connection;
|
pub mod connection;
|
||||||
mod connections;
|
pub mod connections;
|
||||||
mod provider;
|
pub mod provider;
|
||||||
mod synced_connections;
|
pub mod request;
|
||||||
|
pub mod synced_connections;
|
||||||
pub use connection::{ActiveRequestHandle, HandleResult, Web3Connection};
|
|
||||||
pub use connections::Web3Connections;
|
|
||||||
pub use synced_connections::SyncedConnections;
|
|
||||||
|
@ -1,4 +1,7 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
|
use tracing::{info_span, instrument, Instrument};
|
||||||
|
|
||||||
/// Use HTTP and WS providers.
|
/// Use HTTP and WS providers.
|
||||||
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
|
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
|
||||||
@ -7,3 +10,37 @@ pub enum Web3Provider {
|
|||||||
Http(ethers::providers::Provider<ethers::providers::Http>),
|
Http(ethers::providers::Provider<ethers::providers::Http>),
|
||||||
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Web3Provider {
|
||||||
|
#[instrument]
|
||||||
|
pub async fn from_str(
|
||||||
|
url_str: &str,
|
||||||
|
http_client: Option<reqwest::Client>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
|
let provider = if url_str.starts_with("http") {
|
||||||
|
let url: url::Url = url_str.parse()?;
|
||||||
|
|
||||||
|
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
||||||
|
|
||||||
|
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
||||||
|
|
||||||
|
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
||||||
|
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
|
||||||
|
ethers::providers::Provider::new(provider)
|
||||||
|
.interval(Duration::from_secs(13))
|
||||||
|
.into()
|
||||||
|
} else if url_str.starts_with("ws") {
|
||||||
|
let provider = ethers::providers::Ws::connect(url_str)
|
||||||
|
.instrument(info_span!("Web3Provider", %url_str))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
|
||||||
|
// TODO: i don't think this interval matters
|
||||||
|
ethers::providers::Provider::new(provider).into()
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(provider)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
95
web3_proxy/src/rpcs/request.rs
Normal file
95
web3_proxy/src/rpcs/request.rs
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
use super::connection::Web3Connection;
|
||||||
|
use super::provider::Web3Provider;
|
||||||
|
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||||
|
use crate::config::BlockAndRpc;
|
||||||
|
use anyhow::Context;
|
||||||
|
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||||
|
use futures::future::try_join_all;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult};
|
||||||
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::fmt;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||||
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tokio::sync::RwLock as AsyncRwLock;
|
||||||
|
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
|
||||||
|
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
|
||||||
|
|
||||||
|
// TODO: rename this
|
||||||
|
pub enum RequestHandleResult {
|
||||||
|
ActiveRequest(PendingRequestHandle),
|
||||||
|
RetryAt(Instant),
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drop this once a connection completes
|
||||||
|
pub struct PendingRequestHandle(Arc<Web3Connection>);
|
||||||
|
|
||||||
|
impl PendingRequestHandle {
|
||||||
|
pub fn new(connection: Arc<Web3Connection>) -> Self {
|
||||||
|
// TODO: attach a unique id to this?
|
||||||
|
// TODO: what ordering?!
|
||||||
|
connection
|
||||||
|
.active_requests
|
||||||
|
.fetch_add(1, atomic::Ordering::AcqRel);
|
||||||
|
|
||||||
|
Self(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clone_connection(&self) -> Arc<Web3Connection> {
|
||||||
|
self.0.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a web3 request
|
||||||
|
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||||
|
/// By taking self here, we ensure that this is dropped after the request is complete
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub async fn request<T, R>(
|
||||||
|
&self,
|
||||||
|
method: &str,
|
||||||
|
params: T,
|
||||||
|
) -> Result<R, ethers::prelude::ProviderError>
|
||||||
|
where
|
||||||
|
T: fmt::Debug + serde::Serialize + Send + Sync,
|
||||||
|
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
|
||||||
|
{
|
||||||
|
// TODO: use tracing spans properly
|
||||||
|
// TODO: it would be nice to have the request id on this
|
||||||
|
// TODO: including params in this is way too verbose
|
||||||
|
trace!("Sending {} to {}", method, self.0);
|
||||||
|
|
||||||
|
let mut provider = None;
|
||||||
|
|
||||||
|
while provider.is_none() {
|
||||||
|
// TODO: if no provider, don't unwrap. wait until there is one.
|
||||||
|
match self.0.provider.read().await.as_ref() {
|
||||||
|
None => {}
|
||||||
|
Some(found_provider) => provider = Some(found_provider.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = match &*provider.unwrap() {
|
||||||
|
Web3Provider::Http(provider) => provider.request(method, params).await,
|
||||||
|
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: i think ethers already has trace logging (and does it much more fancy)
|
||||||
|
// TODO: at least instrument this with more useful information
|
||||||
|
// trace!("Reply from {}: {:?}", self.0, response);
|
||||||
|
trace!("Reply from {}", self.0);
|
||||||
|
|
||||||
|
response
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for PendingRequestHandle {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0
|
||||||
|
.active_requests
|
||||||
|
.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
use super::{Web3Connection, Web3Connections};
|
use super::connection::Web3Connection;
|
||||||
|
use super::connections::Web3Connections;
|
||||||
use ethers::prelude::{H256, U64};
|
use ethers::prelude::{H256, U64};
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
Loading…
Reference in New Issue
Block a user