it works, but we need it to be optional

This commit is contained in:
Bryan Stitt 2022-06-14 22:02:18 +00:00
parent a5e324a692
commit 3d3e0c8c8b
3 changed files with 46 additions and 40 deletions

@ -1,8 +1,8 @@
# Todo
- [ ] if web3 proxy gets an http error back, retry another node
- [ ] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions
- [ ] refactor Connections::spawn. have it return a handle that is selecting on those handles?
- [x] refactor Connection::spawn. have it return a handle to the spawned future of it running with block and transaction subscriptions
- [x] refactor Connections::spawn. have it return a handle that is selecting on those handles?
- [x] support websocket clients
- we support websockets for the backends already, but we need them for the frontend too
- [ ] when block subscribers receive blocks, store them in a cache. use this cache instead of querying eth_getBlock
@ -44,6 +44,7 @@
- [ ] zero downtime deploys
- [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes?
- [ ] subscription id should be per connection, not global
- [ ] emit stats
- [x] simple proxy
- [x] better locking. when lots of requests come in, we seem to be in the way of block updates
- [x] load balance between multiple RPC servers

@ -7,6 +7,7 @@ use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::Transaction;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -53,9 +54,9 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
pub enum TxState {
Confirmed(TxHash, H256),
Pending(TxHash),
Orphaned(TxHash, H256),
Confirmed(Transaction),
Pending(Transaction),
Orphaned(Transaction),
}
/// The application
@ -247,20 +248,10 @@ impl Web3ProxyApp {
while let Ok(new_tx_state) = pending_tx_receiver.recv_async().await {
let new_tx = match new_tx_state {
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx_hash, _block_hash) => {
self.balanced_rpcs.get_pending_tx(&tx_hash)
}
TxState::Pending(tx_hash) => {
self.balanced_rpcs.get_pending_tx(&tx_hash)
}
TxState::Orphaned(tx) => tx,
TxState::Pending(tx) => tx,
};
if new_tx.is_none() {
continue;
}
let new_tx = &*new_tx.unwrap();
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",

@ -141,7 +141,6 @@ impl Web3Connections {
tokio::spawn(async move {
connections
.subscribe(
pending_tx_id_sender,
pending_tx_id_receiver,
block_receiver,
head_block_sender,
@ -157,7 +156,6 @@ impl Web3Connections {
/// subscribe to all the backend rpcs
async fn subscribe(
self: Arc<Self>,
pending_tx_id_sender: flume::Sender<(TxHash, Arc<Web3Connection>)>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
@ -169,7 +167,7 @@ impl Web3Connections {
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
if let Some(pending_tx_sender) = pending_tx_sender {
if let Some(pending_tx_sender) = pending_tx_sender.clone() {
// TODO: do something with the handle so we can catch any errors
let clone = self.clone();
let handle = task::spawn(async move {
@ -189,12 +187,10 @@ impl Web3Connections {
// TODO: do not unwrap. orphans might make this unsafe
let tx_state = match &pending_transaction.block_hash {
Some(block_hash) => {
TxState::Confirmed(pending_transaction_id, *block_hash)
}
Some(_block_hash) => TxState::Confirmed(pending_transaction),
None => {
entry.insert(pending_transaction);
TxState::Pending(pending_transaction_id)
entry.insert(pending_transaction.clone());
TxState::Pending(pending_transaction)
}
};
@ -213,11 +209,12 @@ impl Web3Connections {
// setup the block funnel
if let Some(head_block_sender) = head_block_sender {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = task::Builder::default()
.name("update_synced_rpcs")
.spawn(async move {
connections
.update_synced_rpcs(block_receiver, head_block_sender, pending_tx_id_sender)
.update_synced_rpcs(block_receiver, head_block_sender, pending_tx_sender)
.await
});
@ -238,12 +235,12 @@ impl Web3Connections {
Ok(())
}
pub fn get_pending_tx(
&self,
tx_hash: &TxHash,
) -> Option<dashmap::mapref::one::Ref<TxHash, Transaction>> {
self.pending_transactions.get(tx_hash)
}
// pub fn get_pending_tx(
// &self,
// tx_hash: &TxHash,
// ) -> Option<dashmap::mapref::one::Ref<TxHash, Transaction>> {
// self.pending_transactions.get(tx_hash)
// }
pub fn get_head_block_hash(&self) -> H256 {
*self.synced_connections.load().get_head_block_hash()
@ -312,7 +309,8 @@ impl Web3Connections {
&self,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>,
head_block_sender: watch::Sender<Block<TxHash>>,
pending_tx_id_sender: flume::Sender<(TxHash, Arc<Web3Connection>)>,
// TODO: use pending_tx_sender
pending_tx_sender: Option<flume::Sender<TxState>>,
) -> anyhow::Result<()> {
let total_rpcs = self.inner.len();
@ -322,10 +320,10 @@ impl Web3Connections {
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
// TODO: wth. how is this happening? need more logs
let new_block_num = match new_block.number {
Some(x) => x.as_u64(),
None => {
// TODO: wth. how is this happening? need more logs
warn!(?new_block, "Block without number!");
continue;
}
@ -341,7 +339,7 @@ impl Web3Connections {
let _enter = span.enter();
if new_block_num == 0 {
warn!("rpc is still syncing");
warn!("still syncing");
}
connection_states.insert(rpc.clone(), (new_block_num, new_block_hash));
@ -362,7 +360,7 @@ impl Web3Connections {
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
head_block_sender.send(new_block)?;
head_block_sender.send(new_block.clone())?;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
@ -417,11 +415,8 @@ impl Web3Connections {
// TODO: do this more efficiently?
if pending_synced_connections.head_block_hash != most_common_head_hash {
head_block_sender.send(new_block)?;
head_block_sender.send(new_block.clone())?;
pending_synced_connections.head_block_hash = most_common_head_hash;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
}
pending_synced_connections.inner = synced_rpcs.into_iter().collect();
@ -451,8 +446,27 @@ impl Web3Connections {
synced_connections.inner
);
// TODO: what if the hashes don't match?
if pending_synced_connections.head_block_hash == new_block_hash {
// mark all transactions in the block as confirmed
if let Some(pending_tx_sender) = &pending_tx_sender {
// TODO: we need new_block to be the new_head_block
for tx_hash in &new_block.transactions {
match self.pending_transactions.remove(tx_hash) {
Some((_tx_id, tx)) => {
pending_tx_sender.send_async(TxState::Confirmed(tx)).await?;
}
None => continue,
}
}
};
// TODO: mark any orphaned transactions as unconfirmed
}
// TODO: only publish if there are x (default 2) nodes synced to this block?
// do the arcswap
// TODO: do this before or after processing all the transactions in this block?
self.synced_connections.swap(synced_connections);
}