From 6040ca297fa6de7495bc1d926021d7896bccb619 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 15 Jun 2023 11:43:24 -0700 Subject: [PATCH] fix eth_unsubscribe --- web3_proxy/src/frontend/rpc_proxy_ws.rs | 65 +++++++++++++++++-------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 3ba1a020..0285a440 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -10,6 +10,7 @@ use crate::{ errors::Web3ProxyResult, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, }; +use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, @@ -19,6 +20,7 @@ use axum::{ }; use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; +use ethers::types::U64; use futures::SinkExt; use futures::{ future::AbortHandle, @@ -307,7 +309,7 @@ async fn handle_socket_payload( payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicU64, - subscriptions: Arc>>, + subscriptions: Arc>>, ) -> Web3ProxyResult<(Message, Option)> { let (authorization, semaphore) = authorization.check_again(&app).await?; @@ -334,7 +336,12 @@ async fn handle_socket_payload( Ok((handle, response)) => { if let Some(subscription_id) = response.result.clone() { let mut x = subscriptions.write().await; - x.insert(subscription_id.get().to_string(), handle); + + let key: U64 = serde_json::from_str(subscription_id.get()).unwrap(); + + info!("key: {}", key); + + x.insert(key, handle); } Ok(response.into()) @@ -347,25 +354,31 @@ async fn handle_socket_payload( RequestMetadata::new(&app, authorization.clone(), &json_request, None) .await; - let subscription_id = - if let Some(params) = json_request.params.get(0).and_then(|x| x.as_str()) { - params - } else if let Some(param) = json_request.params.as_str() { - param + let subscription_id: U64 = + if let Some(param) = json_request.params.get(0).cloned() { + serde_json::from_value(param) + .context("failed parsing [subscription_id] as a U64")? } else { - return Err(Web3ProxyError::BadRequest( - format!( - "unexpected params given for eth_unsubscribe ({:?})", - json_request.params - ) - .into(), - )); + match serde_json::from_value::(json_request.params) { + Ok(x) => x, + Err(err) => { + return Err(Web3ProxyError::BadRequest( + format!( + "unexpected params given for eth_unsubscribe: {:?}", + err + ) + .into(), + )) + } + } }; + info!("key: {}", subscription_id); + // TODO: is this the right response? let partial_response = { let mut x = subscriptions.write().await; - match x.remove(subscription_id) { + match x.remove(&subscription_id) { None => false, Some(handle) => { handle.abort(); @@ -438,11 +451,11 @@ async fn read_web3_socket( let f = async move { // new message from our client. forward to a backend and then send it through response_sender let (response_msg, _semaphore) = match msg { - Message::Text(ref payload) => { + Message::Text(payload) => { match handle_socket_payload( - app.clone(), + app, &authorization, - payload, + &payload, &response_sender, &subscription_count, subscriptions, @@ -473,8 +486,8 @@ async fn read_web3_socket( Message::Binary(mut payload) => { let payload = from_utf8_mut(&mut payload).unwrap(); - match handle_socket_payload( - app.clone(), + let (m, s) = match handle_socket_payload( + app, &authorization, payload, &response_sender, @@ -484,10 +497,20 @@ async fn read_web3_socket( .await { Ok((m, s)) => (m, Some(s)), Err(err) => { + // TODO: how can we get the id out of the payload? let m = err.into_message(None); (m, None) } - } + }; + + // TODO: is this an okay way to convert from text to binary? + let m = if let Message::Text(m) = m { + Message::Binary(m.as_bytes().to_vec()) + } else { + unimplemented!(); + }; + + (m, s) } };