From a6a238fff2cd3c540c1bcb0bc59dc12028ea8b47 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 31 May 2022 01:55:04 +0000 Subject: [PATCH] handle binary and text messages --- web3-proxy/src/app.rs | 2 + web3-proxy/src/connections.rs | 9 +- web3-proxy/src/frontend.rs | 166 +++++++++++++++++++--------------- 3 files changed, 104 insertions(+), 73 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 81a9018e..ab23b868 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -309,6 +309,8 @@ impl Web3ProxyApp { ) -> anyhow::Result { trace!("Received request: {:?}", request); + // TODO: if eth_chainId or net_version, serve those without querying the backend + // TODO: how much should we retry? probably with a timeout and not with a count like this // TODO: think more about this loop. // // TODO: add more to this span diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 5a43229a..3a3adebe 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -237,7 +237,14 @@ impl Web3Connections { let mut pending_synced_connections = SyncedConnections::default(); while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await { - let new_block_num = new_block.number.unwrap().as_u64(); + // TODO: wth. how is this happening? need more logs + let new_block_num = match new_block.number { + Some(x) => x.as_u64(), + None => { + warn!(?new_block, "Block without number!"); + continue; + } + }; let new_block_hash = new_block.hash.unwrap(); // TODO: span with more in it? diff --git a/web3-proxy/src/frontend.rs b/web3-proxy/src/frontend.rs index a7c3643c..0c13b6a1 100644 --- a/web3-proxy/src/frontend.rs +++ b/web3-proxy/src/frontend.rs @@ -7,14 +7,18 @@ use axum::{ routing::{get, post}, Extension, Json, Router, }; -use futures::stream::{SplitSink, SplitStream, StreamExt}; use futures::SinkExt; +use futures::{ + future::AbortHandle, + stream::{SplitSink, SplitStream, StreamExt}, +}; use hashbrown::HashMap; use serde_json::json; use serde_json::value::RawValue; use std::net::SocketAddr; +use std::str::from_utf8_mut; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::{debug, error, info, warn}; use crate::{ app::Web3ProxyApp, @@ -84,6 +88,77 @@ async fn proxy_web3_socket(app: Extension>, socket: WebSocket) tokio::spawn(read_web3_socket(app, ws_rx, response_tx)); } +async fn handle_socket_payload( + app: &Web3ProxyApp, + payload: &str, + response_tx: &flume::Sender, + subscriptions: &mut HashMap, +) -> Message { + let (id, response) = match serde_json::from_str::(payload) { + Ok(payload) => { + let id = payload.id.clone(); + + let response: anyhow::Result = if payload.method + == "eth_subscribe" + { + // TODO: if we pass eth_subscribe the response_tx, we + let response = app + .eth_subscribe(id.clone(), payload, response_tx.clone()) + .await; + + match response { + Ok((handle, response)) => { + // TODO: better key + subscriptions.insert(response.result.as_ref().unwrap().to_string(), handle); + + Ok(response.into()) + } + Err(err) => Err(err), + } + } else if payload.method == "eth_unsubscribe" { + let subscription_id = payload.params.unwrap().to_string(); + + let partial_response = match subscriptions.remove(&subscription_id) { + None => "false", + Some(handle) => { + handle.abort(); + "true" + } + }; + + let response = + JsonRpcForwardedResponse::from_string(partial_response.to_string(), id.clone()); + + Ok(response.into()) + } else { + // TODO: if this is a subscription request, we need to do some special handling. something with channels + // TODO: just handle subscribe_newBlock + + app.proxy_web3_rpc(payload.into()).await + }; + + (id, response) + } + Err(err) => { + // TODO: what should this id be? + let id = RawValue::from_string("0".to_string()).unwrap(); + (id, Err(err.into())) + } + }; + + let response_str = match response { + Ok(x) => serde_json::to_string(&x), + Err(err) => { + // we have an anyhow error. turn it into + let response = JsonRpcForwardedResponse::from_anyhow_error(err, id); + serde_json::to_string(&response) + } + } + .unwrap(); + + Message::Text(response_str) +} + async fn read_web3_socket( app: Extension>, mut ws_rx: SplitStream, @@ -95,78 +170,25 @@ async fn read_web3_socket( // new message from our client. forward to a backend and then send it through response_tx let response_msg = match msg { Message::Text(payload) => { - let (id, response) = match serde_json::from_str::(&payload) { - Ok(payload) => { - let id = payload.id.clone(); - - let response: anyhow::Result = - if payload.method == "eth_subscribe" { - // TODO: if we pass eth_subscribe the response_tx, we - let response = app - .0 - .eth_subscribe(id.clone(), payload, response_tx.clone()) - .await; - - match response { - Ok((handle, response)) => { - // TODO: better key - subscriptions.insert( - response.result.as_ref().unwrap().to_string(), - handle, - ); - - Ok(response.into()) - } - Err(err) => Err(err), - } - } else if payload.method == "eth_unsubscribe" { - let subscription_id = payload.params.unwrap().to_string(); - - let partial_response = match subscriptions.remove(&subscription_id) - { - None => "false", - Some(handle) => { - handle.abort(); - "true" - } - }; - - let response = JsonRpcForwardedResponse::from_string( - partial_response.to_string(), - id.clone(), - ); - - Ok(response.into()) - } else { - // TODO: if this is a subscription request, we need to do some special handling. something with channels - // TODO: just handle subscribe_newBlock - - app.0.proxy_web3_rpc(payload.into()).await - }; - - (id, response) - } - Err(err) => { - // TODO: what should this id be? - let id = RawValue::from_string("0".to_string()).unwrap(); - (id, Err(err.into())) - } - }; - - let response_str = match response { - Ok(x) => serde_json::to_string(&x), - Err(err) => { - // we have an anyhow error. turn it into - let response = JsonRpcForwardedResponse::from_anyhow_error(err, id); - serde_json::to_string(&response) - } - } - .unwrap(); - - Message::Text(response_str) + handle_socket_payload(app.0.as_ref(), &payload, &response_tx, &mut subscriptions) + .await } Message::Ping(x) => Message::Pong(x), - _ => unimplemented!(), + Message::Pong(x) => { + debug!("pong: {:?}", x); + continue; + } + Message::Close(_) => { + info!("closing websocket connection"); + break; + } + Message::Binary(mut payload) => { + // TODO: what should we do here? + let payload = from_utf8_mut(&mut payload).unwrap(); + + handle_socket_payload(app.0.as_ref(), payload, &response_tx, &mut subscriptions) + .await + } }; match response_tx.send_async(response_msg).await {