This commit is contained in:
Bryan Stitt 2022-06-14 06:09:56 +00:00
parent 1bef6756eb
commit d75a60a09f
2 changed files with 11 additions and 1 deletions

View File

@ -83,6 +83,7 @@ impl Web3ProxyApp {
balanced_rpcs: Vec<Web3ConnectionConfig>, balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>, private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
// TODO: try_join_all instead
let mut handles = FuturesUnordered::new(); let mut handles = FuturesUnordered::new();
// make a http shared client // make a http shared client

View File

@ -90,6 +90,7 @@ impl Web3Connections {
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let num_connections = server_configs.len(); let num_connections = server_configs.len();
// TODO: try_join_all
let handles = FuturesUnordered::new(); let handles = FuturesUnordered::new();
// TODO: only create these if head_block_sender and pending_tx_sender are set // TODO: only create these if head_block_sender and pending_tx_sender are set
@ -170,13 +171,15 @@ impl Web3Connections {
while let Ok((pending_transaction_id, rpc)) = while let Ok((pending_transaction_id, rpc)) =
pending_tx_id_receiver.recv_async().await pending_tx_id_receiver.recv_async().await
{ {
unimplemented!("de-dedup the pending txid");
let request_handle = rpc.wait_for_request_handle().await; let request_handle = rpc.wait_for_request_handle().await;
let pending_transaction: Transaction = request_handle let pending_transaction: Transaction = request_handle
.request("eth_getTransactionByHash", (pending_transaction_id,)) .request("eth_getTransactionByHash", (pending_transaction_id,))
.await?; .await?;
// unimplemented!("de-dedup the pending txid"); trace!(?pending_transaction, "pending");
pending_tx_sender.send_async(pending_transaction).await?; pending_tx_sender.send_async(pending_transaction).await?;
} }
@ -333,6 +336,9 @@ impl Web3Connections {
pending_synced_connections.head_block_hash = new_block_hash; pending_synced_connections.head_block_hash = new_block_hash;
head_block_sender.send(new_block)?; head_block_sender.send(new_block)?;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash { if new_block_hash == pending_synced_connections.head_block_hash {
@ -386,6 +392,9 @@ impl Web3Connections {
if pending_synced_connections.head_block_hash != most_common_head_hash { if pending_synced_connections.head_block_hash != most_common_head_hash {
head_block_sender.send(new_block)?; head_block_sender.send(new_block)?;
pending_synced_connections.head_block_hash = most_common_head_hash; pending_synced_connections.head_block_hash = most_common_head_hash;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
} }
pending_synced_connections.inner = synced_rpcs.into_iter().collect(); pending_synced_connections.inner = synced_rpcs.into_iter().collect();