simple cache

This commit is contained in:
Bryan Stitt 2022-06-14 06:42:52 +00:00
parent d75a60a09f
commit 1ecd852401
2 changed files with 40 additions and 14 deletions

@ -5,8 +5,8 @@ use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::Transaction;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -52,6 +52,12 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
}
pub enum TxState {
Confirmed(TxHash, H256),
Pending(TxHash),
Orphaned(TxHash, H256),
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
@ -65,7 +71,7 @@ pub struct Web3ProxyApp {
// don't drop this or the sender will stop working
head_block_receiver: watch::Receiver<Block<TxHash>>,
// TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash)
pending_tx_receiver: flume::Receiver<Transaction>,
pending_tx_receiver: flume::Receiver<TxState>,
next_subscription_id: AtomicUsize,
}
@ -393,10 +399,10 @@ impl Web3ProxyApp {
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
DashMapEntry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
DashMapEntry::Vacant(entry) => {
entry.insert(incoming_rx);
}
}

@ -1,6 +1,8 @@
///! Load balanced communication with a group of web3 providers
use arc_swap::ArcSwap;
use counter::Counter;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256};
use futures::stream::FuturesUnordered;
@ -19,7 +21,7 @@ use tokio::task;
use tokio::time::sleep;
use tracing::{debug, info, info_span, instrument, trace, warn};
use crate::app::AnyhowJoinHandle;
use crate::app::{AnyhowJoinHandle, TxState};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
@ -52,6 +54,7 @@ impl SyncedConnections {
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
pending_transactions: DashMap<TxHash, Transaction>,
}
impl Serialize for Web3Connections {
@ -86,7 +89,7 @@ impl Web3Connections {
http_client: Option<&reqwest::Client>,
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_sender: Option<flume::Sender<Transaction>>,
pending_tx_sender: Option<flume::Sender<TxState>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let num_connections = server_configs.len();
@ -129,6 +132,7 @@ impl Web3Connections {
let connections = Arc::new(Self {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions: Default::default(),
});
let handle = {
@ -157,7 +161,7 @@ impl Web3Connections {
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_sender: Option<flume::Sender<Transaction>>,
pending_tx_sender: Option<flume::Sender<TxState>>,
) -> anyhow::Result<()> {
let mut futures = FuturesUnordered::new();
@ -167,21 +171,37 @@ impl Web3Connections {
// forwards new transacitons to pending_tx_receipt_sender
if let Some(pending_tx_sender) = pending_tx_sender {
// TODO: do something with the handle so we can catch any errors
let clone = self.clone();
let handle = task::spawn(async move {
while let Ok((pending_transaction_id, rpc)) =
pending_tx_id_receiver.recv_async().await
{
unimplemented!("de-dedup the pending txid");
match clone.pending_transactions.entry(pending_transaction_id) {
DashMapEntry::Occupied(_entry) => continue,
DashMapEntry::Vacant(entry) => {
let request_handle = rpc.wait_for_request_handle().await;
let request_handle = rpc.wait_for_request_handle().await;
let pending_transaction: Transaction = request_handle
.request("eth_getTransactionByHash", (pending_transaction_id,))
.await?;
let pending_transaction: Transaction = request_handle
.request("eth_getTransactionByHash", (pending_transaction_id,))
.await?;
trace!(?pending_transaction, "pending");
trace!(?pending_transaction, "pending");
// TODO: do not unwrap. orphans might make this unsafe
let tx_state = match &pending_transaction.block_hash {
Some(block_hash) => {
TxState::Confirmed(pending_transaction_id, *block_hash)
}
None => {
entry.insert(pending_transaction);
TxState::Pending(pending_transaction_id)
}
};
pending_tx_sender.send_async(pending_transaction).await?;
// TODO: maybe we should just send the txid and they can get it from the dashmap?
pending_tx_sender.send_async(tx_state).await?;
}
}
}
Ok(())