newPendingRawTransactions
This commit is contained in:
parent
27e07b3fc9
commit
afb3dde4b3
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -3544,9 +3544,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower"
|
name = "tower"
|
||||||
version = "0.4.12"
|
version = "0.4.13"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
|
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
1
TODO.md
1
TODO.md
@ -1,5 +1,6 @@
|
|||||||
# Todo
|
# Todo
|
||||||
|
|
||||||
|
- [ ] quick requests/second timer until we have real stats
|
||||||
- [x] it works for a few seconds and then gets stuck on something.
|
- [x] it works for a few seconds and then gets stuck on something.
|
||||||
- [x] its working with one backend node, but multiple breaks. something to do with pending transactions
|
- [x] its working with one backend node, but multiple breaks. something to do with pending transactions
|
||||||
- [x] dashmap entry api is easy to deadlock! be careful with it!
|
- [x] dashmap entry api is easy to deadlock! be careful with it!
|
||||||
|
@ -35,5 +35,5 @@ tracing = "0.1.35"
|
|||||||
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
|
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
|
||||||
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] }
|
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "parking_lot"] }
|
||||||
url = "2.2.2"
|
url = "2.2.2"
|
||||||
tower = "0.4.12"
|
tower = "0.4.13"
|
||||||
tokio-stream = { version = "0.1.9", features = ["sync"] }
|
tokio-stream = { version = "0.1.9", features = ["sync"] }
|
||||||
|
@ -355,6 +355,50 @@ impl Web3ProxyApp {
|
|||||||
trace!(?subscription_id, "closed new heads subscription");
|
trace!(?subscription_id, "closed new heads subscription");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
r#"["newPendingRawTransactions"]"# => {
|
||||||
|
// TODO: too much copy/pasta with newPendingTransactions
|
||||||
|
let pending_tx_receiver = self.pending_tx_sender.subscribe();
|
||||||
|
|
||||||
|
let mut pending_tx_receiver = Abortable::new(
|
||||||
|
BroadcastStream::new(pending_tx_receiver),
|
||||||
|
subscription_registration,
|
||||||
|
);
|
||||||
|
|
||||||
|
let subscription_id = subscription_id.clone();
|
||||||
|
|
||||||
|
trace!(?subscription_id, "pending transactions subscription");
|
||||||
|
|
||||||
|
// TODO: do something with this handle?
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
|
||||||
|
let new_tx = match new_tx_state {
|
||||||
|
TxState::Pending(tx) => tx,
|
||||||
|
TxState::Confirmed(..) => continue,
|
||||||
|
TxState::Orphaned(tx) => tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
|
||||||
|
let msg = json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "eth_subscription",
|
||||||
|
"params": {
|
||||||
|
"subscription": subscription_id,
|
||||||
|
// upstream just sends the txid, but we want to send the whole transaction
|
||||||
|
"result": new_tx.rlp(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||||
|
|
||||||
|
if subscription_tx.send_async(msg).await.is_err() {
|
||||||
|
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(?subscription_id, "closed new heads subscription");
|
||||||
|
});
|
||||||
|
}
|
||||||
_ => return Err(anyhow::anyhow!("unimplemented")),
|
_ => return Err(anyhow::anyhow!("unimplemented")),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -427,7 +427,7 @@ impl Web3Connection {
|
|||||||
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
||||||
if let Some(provider) = self.provider.read().await.clone() {
|
if let Some(provider) = self.provider.read().await.clone() {
|
||||||
match &*provider {
|
match &*provider {
|
||||||
Web3Provider::Http(_provider) => {
|
Web3Provider::Http(provider) => {
|
||||||
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
// TODO: what should this interval be? probably automatically set to some fraction of block time
|
// TODO: what should this interval be? probably automatically set to some fraction of block time
|
||||||
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
||||||
|
@ -205,13 +205,13 @@ impl Web3Connections {
|
|||||||
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
|
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
|
||||||
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
|
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
|
||||||
|
|
||||||
if self.pending_transactions.contains_key(&pending_tx_id) {
|
if pending_tx_sender.receiver_count() == 0 {
|
||||||
// this transaction has already been processed
|
// no receivers, so no point in querying to get the full transaction
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if pending_tx_sender.receiver_count() == 0 {
|
if self.pending_transactions.contains_key(&pending_tx_id) {
|
||||||
// no receivers, so no point in querying to get the full transaction
|
// this transaction has already been processed
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,13 +310,6 @@ impl Web3Connections {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn get_pending_tx(
|
|
||||||
// &self,
|
|
||||||
// tx_hash: &TxHash,
|
|
||||||
// ) -> Option<dashmap::mapref::one::Ref<TxHash, Transaction>> {
|
|
||||||
// self.pending_transactions.get(tx_hash)
|
|
||||||
// }
|
|
||||||
|
|
||||||
pub fn get_head_block_hash(&self) -> H256 {
|
pub fn get_head_block_hash(&self) -> H256 {
|
||||||
*self.synced_connections.load().get_head_block_hash()
|
*self.synced_connections.load().get_head_block_hash()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user