pass more handles around

This commit is contained in:
Bryan Stitt 2022-06-16 17:51:49 +00:00
parent 4ca292c3e1
commit 36bfd4bdcc
4 changed files with 129 additions and 72 deletions

@ -3,14 +3,16 @@ use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::Transaction;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::future::{try_join_all, Abortable};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::Future;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use serde_json::json;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
@ -54,6 +56,20 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
}
pub async fn flatten_handles(
mut handles: FuturesUnordered<AnyhowJoinHandle<()>>,
) -> anyhow::Result<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
}
Ok(())
}
#[derive(Clone)]
pub enum TxState {
Known(TxHash),
@ -65,6 +81,7 @@ pub enum TxState {
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
balanced_rpcs: Arc<Web3Connections>,
@ -76,6 +93,8 @@ pub struct Web3ProxyApp {
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Block<TxHash>>,
pending_tx_sender: broadcast::Sender<TxState>,
pending_tx_receiver: broadcast::Receiver<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
next_subscription_id: AtomicUsize,
}
@ -92,9 +111,12 @@ impl Web3ProxyApp {
redis_address: Option<String>,
balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
// TODO: try_join_all instead
let mut handles = FuturesUnordered::new();
let handles = FuturesUnordered::new();
// make a http shared client
// TODO: how should we configure the connection pool?
@ -127,7 +149,12 @@ impl Web3ProxyApp {
let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
// TODO: will one receiver lagging be okay?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16);
drop(pending_tx_receiver);
let pending_transactions = Arc::new(DashMap::new());
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
// TODO: attach context to this error
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
@ -137,6 +164,7 @@ impl Web3ProxyApp {
rate_limiter.as_ref(),
Some(head_block_sender),
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
)
.await?;
@ -155,7 +183,8 @@ impl Web3ProxyApp {
// subscribing to new heads here won't work well
None,
// TODO: subscribe to pending transactions on the private rpcs?
None,
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
)
.await?;
@ -171,23 +200,16 @@ impl Web3ProxyApp {
response_cache: Default::default(),
head_block_receiver,
pending_tx_sender,
pending_tx_receiver,
pending_transactions,
next_subscription_id: 1.into(),
};
let app = Arc::new(app);
// create a handle that returns on the first error
let handle = tokio::spawn(async move {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
}
Ok(())
});
// TODO: move this to a helper. i think Web3Connections needs it too
let handle = Box::pin(flatten_handles(handles));
Ok((app, handle))
}

@ -1,7 +1,9 @@
use argh::FromArgs;
use ethers::prelude::{Block, TxHash};
use futures::Future;
use serde::Deserialize;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use crate::app::AnyhowJoinHandle;
@ -49,7 +51,12 @@ pub struct Web3ConnectionConfig {
impl RpcConfig {
/// Create a Web3ProxyApp from config
// #[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn spawn(self) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
pub async fn spawn(
self,
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = self.private_rpcs {

@ -21,7 +21,7 @@ use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::sleep;
use tracing::{debug, info, info_span, instrument, trace, warn};
use tracing::{debug, error, info, info_span, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
use crate::config::Web3ConnectionConfig;
@ -57,7 +57,7 @@ impl SyncedConnections {
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
pending_transactions: DashMap<TxHash, TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
}
impl Serialize for Web3Connections {
@ -93,11 +93,12 @@ impl Web3Connections {
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let num_connections = server_configs.len();
// TODO: try_join_all
let handles = FuturesUnordered::new();
let mut handles = vec![];
// TODO: only create these if head_block_sender and pending_tx_sender are set
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
@ -118,7 +119,7 @@ impl Web3Connections {
.await
{
Ok((connection, connection_handle)) => {
handles.push(connection_handle);
handles.push(flatten_handle(connection_handle));
connections.push(connection)
}
Err(e) => warn!("Unable to connect to a server! {:?}", e),
@ -135,13 +136,14 @@ impl Web3Connections {
let connections = Arc::new(Self {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions: Default::default(),
pending_transactions,
});
let handle = {
let connections = connections.clone();
tokio::spawn(async move {
// TODO: try_join_all with the other handles here
connections
.subscribe(
pending_tx_id_receiver,
@ -156,71 +158,96 @@ impl Web3Connections {
Ok((connections, handle))
}
async fn send_transaction(
async fn _funnel_transaction(
&self,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
) -> Result<TxState, ProviderError> {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
let pending_transaction: Transaction = rpc
.wait_for_request_handle()
.await
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?;
trace!(?pending_transaction, "pending");
// TODO: do not unwrap. orphans might make this unsafe
match &pending_transaction.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
Ok(TxState::Confirmed(pending_transaction))
}
None => Ok(TxState::Pending(pending_transaction)),
}
}
async fn funnel_transaction(
self: Arc<Self>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxState>,
) -> anyhow::Result<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
for i in 0..30 {
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
match self.pending_transactions.entry(pending_tx_id) {
DashMapEntry::Occupied(_entry) => {
// TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both
return Ok(());
info!(?pending_tx_id, "checking pending_transactions");
let tx_state = match self.pending_transactions.entry(pending_tx_id) {
DashMapEntry::Occupied(mut entry) => {
if let TxState::Known(_) = entry.get() {
// TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
entry.insert(tx_state.clone());
Some(tx_state)
}
Err(err) => {
debug!(
?i,
?err,
?pending_tx_id,
"failed sending transaction (retry/race)"
);
None
}
}
} else {
None
}
}
DashMapEntry::Vacant(entry) => {
let request_handle = rpc.wait_for_request_handle().await;
// TODO: how many retries?
// TODO: use a generic retry provider instead?
let tx_result = request_handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.await;
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
entry.insert(tx_state.clone());
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
let pending_transaction = match tx_result {
Ok(tx) => Some(tx),
Some(tx_state)
}
Err(err) => {
trace!(
?i,
?err,
?pending_tx_id,
"error getting transaction by hash"
);
debug!(?i, ?err, ?pending_tx_id, "failed sending transaction");
// TODO: how long? exponential backoff?
sleep(Duration::from_millis(100)).await;
continue;
None
}
};
trace!(?pending_transaction, "pending");
let pending_transaction: Transaction = pending_transaction.unwrap();
// TODO: do not unwrap. orphans might make this unsafe
let tx_state = match &pending_transaction.block_hash {
Some(_block_hash) => {
// the transaction is already confirmed. no need to save in the pending_transactions map
TxState::Confirmed(pending_transaction)
}
None => {
let state = TxState::Pending(pending_transaction);
entry.insert(state.clone());
state
}
};
// TODO: maybe we should just send the txid and they can get it from the dashmap?
let _ = pending_tx_sender.send(tx_state);
info!(?pending_tx_id, "sent");
return Ok(());
}
}
};
if let Some(tx_state) = tx_state {
let _ = pending_tx_sender.send(tx_state);
info!(?pending_tx_id, "sent");
// since we sent a transaction, we should return
return Ok(());
}
// unable to update the entry. sleep and try again soon
// TODO: exponential backoff with jitter starting from a much smaller time
sleep(Duration::from_millis(3000)).await;
}
info!(?pending_tx_id, "not found");
@ -247,7 +274,7 @@ impl Web3Connections {
let handle = task::spawn(async move {
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
// TODO: spawn this
let f = clone.clone().send_transaction(
let f = clone.clone().funnel_transaction(
rpc,
pending_tx_id,
pending_tx_sender.clone(),
@ -285,6 +312,7 @@ impl Web3Connections {
}
if let Err(e) = try_join_all(futures).await {
error!("subscriptions over: {:?}", self);
return Err(e);
}

@ -89,7 +89,7 @@ fn main() -> anyhow::Result<()> {
// if everything is working, these should both run forever
tokio::select! {
x = flatten_handle(app_handle) => {
x = app_handle => {
// TODO: error log if error
info!(?x, "app_handle exited");
}