newPendingTransactions
This commit is contained in:
parent
fb6a1bb9d7
commit
a5e324a692
@ -187,12 +187,12 @@ impl Web3ProxyApp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn eth_subscribe(
|
pub async fn eth_subscribe(
|
||||||
&self,
|
self: Arc<Self>,
|
||||||
payload: JsonRpcRequest,
|
payload: JsonRpcRequest,
|
||||||
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
|
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
|
||||||
subscription_tx: flume::Sender<Message>,
|
subscription_tx: flume::Sender<Message>,
|
||||||
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
|
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
|
||||||
let (subscription_handle, subscription_registration) = AbortHandle::new_pair();
|
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
|
||||||
|
|
||||||
// TODO: this only needs to be unique per connection. we don't need it globably unique
|
// TODO: this only needs to be unique per connection. we don't need it globably unique
|
||||||
let subscription_id = self
|
let subscription_id = self
|
||||||
@ -203,7 +203,7 @@ impl Web3ProxyApp {
|
|||||||
// save the id so we can use it in the response
|
// save the id so we can use it in the response
|
||||||
let id = payload.id.clone();
|
let id = payload.id.clone();
|
||||||
|
|
||||||
let subscription_future = {
|
let subscription_join_handle = {
|
||||||
let subscription_id = subscription_id.clone();
|
let subscription_id = subscription_id.clone();
|
||||||
|
|
||||||
match payload.params.as_deref().unwrap().get() {
|
match payload.params.as_deref().unwrap().get() {
|
||||||
@ -211,7 +211,7 @@ impl Web3ProxyApp {
|
|||||||
let head_block_receiver = self.head_block_receiver.clone();
|
let head_block_receiver = self.head_block_receiver.clone();
|
||||||
|
|
||||||
trace!(?subscription_id, "new heads subscription");
|
trace!(?subscription_id, "new heads subscription");
|
||||||
async move {
|
tokio::spawn(async move {
|
||||||
let mut head_block_receiver = Abortable::new(
|
let mut head_block_receiver = Abortable::new(
|
||||||
WatchStream::new(head_block_receiver),
|
WatchStream::new(head_block_receiver),
|
||||||
subscription_registration,
|
subscription_registration,
|
||||||
@ -237,21 +237,61 @@ impl Web3ProxyApp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
trace!(?subscription_id, "closed new heads subscription");
|
trace!(?subscription_id, "closed new heads subscription");
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
r#"["newPendingTransactions"]"# => {
|
||||||
|
let pending_tx_receiver = self.pending_tx_receiver.clone();
|
||||||
|
|
||||||
|
trace!(?subscription_id, "pending transactions subscription");
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Ok(new_tx_state) = pending_tx_receiver.recv_async().await {
|
||||||
|
let new_tx = match new_tx_state {
|
||||||
|
TxState::Confirmed(..) => continue,
|
||||||
|
TxState::Orphaned(tx_hash, _block_hash) => {
|
||||||
|
self.balanced_rpcs.get_pending_tx(&tx_hash)
|
||||||
}
|
}
|
||||||
r#"["pendingTransactions"]"# => {
|
TxState::Pending(tx_hash) => {
|
||||||
unimplemented!("pendingTransactions")
|
self.balanced_rpcs.get_pending_tx(&tx_hash)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if new_tx.is_none() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_tx = &*new_tx.unwrap();
|
||||||
|
|
||||||
|
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
|
||||||
|
let msg = json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "eth_subscription",
|
||||||
|
"params": {
|
||||||
|
"subscription": subscription_id,
|
||||||
|
// upstream just sends the txid, but we want to send the whole transaction
|
||||||
|
"result": new_tx,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
|
||||||
|
|
||||||
|
if subscription_tx.send_async(msg).await.is_err() {
|
||||||
|
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(?subscription_id, "closed new heads subscription");
|
||||||
|
})
|
||||||
}
|
}
|
||||||
_ => return Err(anyhow::anyhow!("unimplemented")),
|
_ => return Err(anyhow::anyhow!("unimplemented")),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: what should we do with this handle? i think its fine to just drop it
|
// TODO: do something with subscription_join_handle?
|
||||||
tokio::spawn(subscription_future);
|
|
||||||
|
|
||||||
let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
|
let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
|
||||||
|
|
||||||
Ok((subscription_handle, response))
|
Ok((subscription_abort_handle, response))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_balanced_rpcs(&self) -> &Web3Connections {
|
pub fn get_balanced_rpcs(&self) -> &Web3Connections {
|
||||||
|
@ -238,6 +238,13 @@ impl Web3Connections {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_pending_tx(
|
||||||
|
&self,
|
||||||
|
tx_hash: &TxHash,
|
||||||
|
) -> Option<dashmap::mapref::one::Ref<TxHash, Transaction>> {
|
||||||
|
self.pending_transactions.get(tx_hash)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_head_block_hash(&self) -> H256 {
|
pub fn get_head_block_hash(&self) -> H256 {
|
||||||
*self.synced_connections.load().get_head_block_hash()
|
*self.synced_connections.load().get_head_block_hash()
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ async fn proxy_web3_socket(app: Extension<Arc<Web3ProxyApp>>, socket: WebSocket)
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_socket_payload(
|
async fn handle_socket_payload(
|
||||||
app: &Web3ProxyApp,
|
app: Arc<Web3ProxyApp>,
|
||||||
payload: &str,
|
payload: &str,
|
||||||
response_tx: &flume::Sender<Message>,
|
response_tx: &flume::Sender<Message>,
|
||||||
subscriptions: &mut HashMap<String, AbortHandle>,
|
subscriptions: &mut HashMap<String, AbortHandle>,
|
||||||
@ -48,7 +48,10 @@ async fn handle_socket_payload(
|
|||||||
|
|
||||||
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
|
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
|
||||||
"eth_subscribe" => {
|
"eth_subscribe" => {
|
||||||
let response = app.eth_subscribe(payload, response_tx.clone()).await;
|
let response = app
|
||||||
|
.clone()
|
||||||
|
.eth_subscribe(payload, response_tx.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
Ok((handle, response)) => {
|
Ok((handle, response)) => {
|
||||||
@ -115,7 +118,7 @@ async fn read_web3_socket(
|
|||||||
// new message from our client. forward to a backend and then send it through response_tx
|
// new message from our client. forward to a backend and then send it through response_tx
|
||||||
let response_msg = match msg {
|
let response_msg = match msg {
|
||||||
Message::Text(payload) => {
|
Message::Text(payload) => {
|
||||||
handle_socket_payload(app.0.as_ref(), &payload, &response_tx, &mut subscriptions)
|
handle_socket_payload(app.0.clone(), &payload, &response_tx, &mut subscriptions)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Message::Ping(x) => Message::Pong(x),
|
Message::Ping(x) => Message::Pong(x),
|
||||||
@ -130,7 +133,7 @@ async fn read_web3_socket(
|
|||||||
Message::Binary(mut payload) => {
|
Message::Binary(mut payload) => {
|
||||||
let payload = from_utf8_mut(&mut payload).unwrap();
|
let payload = from_utf8_mut(&mut payload).unwrap();
|
||||||
|
|
||||||
handle_socket_payload(app.0.as_ref(), payload, &response_tx, &mut subscriptions)
|
handle_socket_payload(app.0.clone(), payload, &response_tx, &mut subscriptions)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user