only subscribe when someone is listening

This commit is contained in:
Bryan Stitt 2022-06-16 20:57:48 +00:00
parent 36bfd4bdcc
commit dd674bb900
4 changed files with 171 additions and 174 deletions

@ -3,8 +3,8 @@ use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::Transaction;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::future::{try_join_all, Abortable};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::Future;
@ -19,7 +19,7 @@ use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
use crate::config::Web3ConnectionConfig;
@ -56,23 +56,23 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
}
pub async fn flatten_handles(
mut handles: FuturesUnordered<AnyhowJoinHandle<()>>,
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
Ok(Ok(_)) => {}
}
}
Ok(())
}
// TODO: think more about TxState. d
#[derive(Clone)]
pub enum TxState {
Known(TxHash),
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
@ -93,7 +93,6 @@ pub struct Web3ProxyApp {
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Block<TxHash>>,
pending_tx_sender: broadcast::Sender<TxState>,
pending_tx_receiver: broadcast::Receiver<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
next_subscription_id: AtomicUsize,
}
@ -106,6 +105,10 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
pub fn get_pending_transactions(&self) -> &DashMap<TxHash, TxState> {
&self.pending_transactions
}
pub async fn spawn(
chain_id: usize,
redis_address: Option<String>,
@ -193,6 +196,9 @@ impl Web3ProxyApp {
private_rpcs
};
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop(pending_tx_receiver);
let app = Web3ProxyApp {
balanced_rpcs,
private_rpcs,
@ -200,7 +206,6 @@ impl Web3ProxyApp {
response_cache: Default::default(),
head_block_receiver,
pending_tx_sender,
pending_tx_receiver,
pending_transactions,
next_subscription_id: 1.into(),
};
@ -231,115 +236,127 @@ impl Web3ProxyApp {
// save the id so we can use it in the response
let id = payload.id.clone();
let subscription_join_handle = {
let subscription_id = subscription_id.clone();
match payload.params.as_deref().unwrap().get() {
r#"["newHeads"]"# => {
let head_block_receiver = self.head_block_receiver.clone();
match payload.params.as_deref().unwrap().get() {
r#"["newHeads"]"# => {
let head_block_receiver = self.head_block_receiver.clone();
let subscription_id = subscription_id.clone();
trace!(?subscription_id, "new heads subscription");
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
trace!(?subscription_id, "new heads subscription");
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
})
}
r#"["newPendingTransactions"]"# => {
let mut pending_tx_receiver = self.pending_tx_sender.subscribe();
trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
while let Ok(new_tx_state) = pending_tx_receiver.recv().await {
let new_tx = match new_tx_state {
TxState::Known(..) => continue,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxState::Pending(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
})
}
r#"["newPendingFullTransactions"]"# => {
// TODO: too much copy/pasta with newPendingTransactions
let mut pending_tx_receiver = self.pending_tx_sender.subscribe();
trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
while let Ok(new_tx_state) = pending_tx_receiver.recv().await {
let new_tx = match new_tx_state {
TxState::Known(..) => continue,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxState::Pending(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
})
}
_ => return Err(anyhow::anyhow!("unimplemented")),
trace!(?subscription_id, "closed new heads subscription");
});
}
};
r#"["newPendingTransactions"]"# => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
let subscription_id = subscription_id.clone();
trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
TxState::Pending(tx) => tx,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
});
}
r#"["newPendingFullTransactions"]"# => {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
let subscription_id = subscription_id.clone();
trace!(?subscription_id, "pending transactions subscription");
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
TxState::Pending(tx) => tx,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
});
}
_ => return Err(anyhow::anyhow!("unimplemented")),
}
// TODO: do something with subscription_join_handle?

@ -2,7 +2,6 @@
use anyhow::Context;
use arc_swap::ArcSwap;
use counter::Counter;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256};
@ -165,6 +164,7 @@ impl Web3Connections {
) -> Result<TxState, ProviderError> {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: there is a race here sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: maximum wait time
let pending_transaction: Transaction = rpc
.wait_for_request_handle()
.await
@ -190,67 +190,44 @@ impl Web3Connections {
pending_tx_sender: broadcast::Sender<TxState>,
) -> anyhow::Result<()> {
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
for i in 0..30 {
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
info!(?pending_tx_id, "checking pending_transactions");
let tx_state = match self.pending_transactions.entry(pending_tx_id) {
DashMapEntry::Occupied(mut entry) => {
if let TxState::Known(_) = entry.get() {
// TODO: if its occupied, but still only "Known", multiple nodes have this transaction. ask both
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
entry.insert(tx_state.clone());
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
Some(tx_state)
}
Err(err) => {
debug!(
?i,
?err,
?pending_tx_id,
"failed sending transaction (retry/race)"
);
None
}
}
} else {
None
}
}
DashMapEntry::Vacant(entry) => {
// TODO: how many retries?
// TODO: use a generic retry provider instead?
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
entry.insert(tx_state.clone());
Some(tx_state)
}
Err(err) => {
debug!(?i, ?err, ?pending_tx_id, "failed sending transaction");
None
}
}
}
};
if let Some(tx_state) = tx_state {
let _ = pending_tx_sender.send(tx_state);
info!(?pending_tx_id, "sent");
// since we sent a transaction, we should return
return Ok(());
}
// unable to update the entry. sleep and try again soon
// TODO: exponential backoff with jitter starting from a much smaller time
sleep(Duration::from_millis(3000)).await;
if self.pending_transactions.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());
}
info!(?pending_tx_id, "not found");
if pending_tx_sender.receiver_count() == 0 {
// no receivers, so no point in querying to get the full transaction
return Ok(());
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self._funnel_transaction(rpc.clone(), pending_tx_id).await {
Ok(tx_state) => {
let _ = pending_tx_sender.send(tx_state);
trace!(?pending_tx_id, "sent");
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Err(err) => {
trace!(?err, ?pending_tx_id, "failed fetching transaction");
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
}
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug!(?pending_tx_id, "not found on {}", rpc);
Ok(())
}
@ -544,8 +521,11 @@ impl Web3Connections {
// mark all transactions in the block as confirmed
if pending_tx_sender.is_some() {
for tx_hash in &new_block.transactions {
// TODO: should we mark as confirmed via pending_tx_sender so that orphans are easier?
// TODO: should we mark as confirmed via pending_tx_sender?
// TODO: possible deadlock here!
// trace!("removing {}...", tx_hash);
let _ = self.pending_transactions.remove(tx_hash);
// trace!("removed {}", tx_hash);
}
};

@ -13,17 +13,17 @@ pub async fn index() -> impl IntoResponse {
/// Very basic status page
pub async fn status(app: Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
let app = app.0.as_ref();
// TODO: what else should we include? uptime? prometheus?
let balanced_rpcs = app.get_balanced_rpcs();
let private_rpcs = app.get_private_rpcs();
let num_active_requests = app.get_active_requests().len();
let num_pending_transactions = app.get_pending_transactions().len();
// TODO: what else should we include? uptime? prometheus?
let body = json!({
"balanced_rpcs": balanced_rpcs,
"private_rpcs": private_rpcs,
"num_active_requests": num_active_requests,
"num_pending_transactions": num_pending_transactions,
});
(StatusCode::INTERNAL_SERVER_ERROR, Json(body))

@ -8,7 +8,7 @@ pub async fn proxy_web3_rpc(
payload: Json<JsonRpcRequestEnum>,
app: Extension<Arc<Web3ProxyApp>>,
) -> impl IntoResponse {
match app.0.proxy_web3_rpc(payload.0).await {
match app.proxy_web3_rpc(payload.0).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => handle_anyhow_error(err, None).await.into_response(),
}