shared interval for http

This commit is contained in:
Bryan Stitt 2022-06-29 19:15:05 +00:00
parent c5fee63b31
commit fe53ffdb40
4 changed files with 58 additions and 14 deletions

@ -61,6 +61,7 @@
- [ ] emit stats for successes, retries, failures, with the types of requests, account, chain, rpc
- [ ] automated soft limit
- [ ] if we send a transaction to private rpcs and then people query it on public rpcs things, some interfaces might think the transaction is dropped (i saw this happen in a brownie script of mine). how should we handle this?
- [ ] don't "unwrap" anywhere. give proper errors
## V2

@ -5,6 +5,7 @@ use serde::Deserialize;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::AnyhowJoinHandle;
use crate::connection::Web3Connection;
@ -83,9 +84,9 @@ impl Web3ConnectionConfig {
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
chain_id: usize,
http_client: Option<&reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Web3Connection>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_rate_limit = self.hard_limit.map(|x| (x, rate_limiter.unwrap()));
@ -93,11 +94,12 @@ impl Web3ConnectionConfig {
chain_id,
self.url,
http_client,
http_interval_sender,
hard_rate_limit,
self.soft_limit,
block_sender,
tx_id_sender,
reconnect,
true,
)
.await
}

@ -11,6 +11,7 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
@ -129,6 +130,7 @@ impl Web3Connection {
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<&reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>,
// TODO: think more about this type
soft_limit: u32,
@ -150,7 +152,7 @@ impl Web3Connection {
let provider = Web3Provider::from_str(&url_str, http_client).await?;
let connection = Web3Connection {
let connection = Self {
url: url_str.clone(),
active_requests: 0.into(),
provider: RwLock::new(Some(Arc::new(provider))),
@ -172,6 +174,7 @@ impl Web3Connection {
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
let found_chain_id =
usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
@ -195,7 +198,7 @@ impl Web3Connection {
let connection = connection.clone();
tokio::spawn(async move {
connection
.subscribe(block_sender, tx_id_sender, reconnect)
.subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect)
.await
})
};
@ -271,15 +274,20 @@ impl Web3Connection {
async fn subscribe(
self: Arc<Self>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
let http_interval_receiver = http_interval_sender.clone().map(|x| x.subscribe());
let mut futures = vec![];
if let Some(block_sender) = &block_sender {
let f = self.clone().subscribe_new_heads(block_sender.clone());
let f = self
.clone()
.subscribe_new_heads(http_interval_receiver, block_sender.clone());
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -331,6 +339,7 @@ impl Web3Connection {
#[instrument(skip_all)]
async fn subscribe_new_heads(
self: Arc<Self>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("watching {}", self);
@ -340,18 +349,16 @@ impl Web3Connection {
match &*provider {
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(13));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver.unwrap();
let mut last_hash = Default::default();
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
http_interval_receiver.recv().await.unwrap();
match self.try_request_handle().await {
Ok(active_request_handle) => {
@ -433,6 +440,7 @@ impl Web3Connection {
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
// TODO:
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::sleep;
use tokio::time::{interval, sleep, MissedTickBehavior};
use tracing::{debug, error, info, info_span, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
@ -103,6 +103,39 @@ impl Web3Connections {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded();
let http_interval_sender = if http_client.is_some() {
let (sender, receiver) = broadcast::channel(1);
drop(receiver);
// TODO: what interval?
let mut interval = interval(Duration::from_secs(13));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let sender = Arc::new(sender);
let f = {
let sender = sender.clone();
async move {
loop {
// TODO: every time a head_block arrives (maybe with a small delay), or on the interval.
interval.tick().await;
// errors are okay. they mean that all receivers have been dropped
let _ = sender.send(());
}
}
};
// TODO: do something with this handle?
tokio::spawn(f);
Some(sender)
} else {
None
};
// turn configs into connections
let mut connections = Vec::with_capacity(num_connections);
for server_config in server_configs.into_iter() {
@ -111,9 +144,9 @@ impl Web3Connections {
rate_limiter,
chain_id,
http_client,
http_interval_sender.clone(),
Some(block_sender.clone()),
Some(pending_tx_id_sender.clone()),
true,
)
.await
{
@ -316,7 +349,7 @@ impl Web3Connections {
}
pub fn has_synced_rpcs(&self) -> bool {
self.synced_connections.load().inner.len() > 0
!self.synced_connections.load().inner.is_empty()
}
/// Send the same request to all the handles. Returning the most common success or most common error.