handle binary and text messages

This commit is contained in:
Bryan Stitt 2022-05-31 01:55:04 +00:00
parent 45fd1b2ff1
commit a6a238fff2
3 changed files with 104 additions and 73 deletions

@ -309,6 +309,8 @@ impl Web3ProxyApp {
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
// TODO: if eth_chainId or net_version, serve those without querying the backend
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
// // TODO: add more to this span

@ -237,7 +237,14 @@ impl Web3Connections {
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block, rpc_id)) = block_receiver.recv_async().await {
let new_block_num = new_block.number.unwrap().as_u64();
// TODO: wth. how is this happening? need more logs
let new_block_num = match new_block.number {
Some(x) => x.as_u64(),
None => {
warn!(?new_block, "Block without number!");
continue;
}
};
let new_block_hash = new_block.hash.unwrap();
// TODO: span with more in it?

@ -7,14 +7,18 @@ use axum::{
routing::{get, post},
Extension, Json, Router,
};
use futures::stream::{SplitSink, SplitStream, StreamExt};
use futures::SinkExt;
use futures::{
future::AbortHandle,
stream::{SplitSink, SplitStream, StreamExt},
};
use hashbrown::HashMap;
use serde_json::json;
use serde_json::value::RawValue;
use std::net::SocketAddr;
use std::str::from_utf8_mut;
use std::sync::Arc;
use tracing::{error, warn};
use tracing::{debug, error, info, warn};
use crate::{
app::Web3ProxyApp,
@ -84,6 +88,77 @@ async fn proxy_web3_socket(app: Extension<Arc<Web3ProxyApp>>, socket: WebSocket)
tokio::spawn(read_web3_socket(app, ws_rx, response_tx));
}
async fn handle_socket_payload(
app: &Web3ProxyApp,
payload: &str,
response_tx: &flume::Sender<Message>,
subscriptions: &mut HashMap<String, AbortHandle>,
) -> Message {
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(payload) => {
let id = payload.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = if payload.method
== "eth_subscribe"
{
// TODO: if we pass eth_subscribe the response_tx, we
let response = app
.eth_subscribe(id.clone(), payload, response_tx.clone())
.await;
match response {
Ok((handle, response)) => {
// TODO: better key
subscriptions.insert(response.result.as_ref().unwrap().to_string(), handle);
Ok(response.into())
}
Err(err) => Err(err),
}
} else if payload.method == "eth_unsubscribe" {
let subscription_id = payload.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id) {
None => "false",
Some(handle) => {
handle.abort();
"true"
}
};
let response =
JsonRpcForwardedResponse::from_string(partial_response.to_string(), id.clone());
Ok(response.into())
} else {
// TODO: if this is a subscription request, we need to do some special handling. something with channels
// TODO: just handle subscribe_newBlock
app.proxy_web3_rpc(payload.into()).await
};
(id, response)
}
Err(err) => {
// TODO: what should this id be?
let id = RawValue::from_string("0".to_string()).unwrap();
(id, Err(err.into()))
}
};
let response_str = match response {
Ok(x) => serde_json::to_string(&x),
Err(err) => {
// we have an anyhow error. turn it into
let response = JsonRpcForwardedResponse::from_anyhow_error(err, id);
serde_json::to_string(&response)
}
}
.unwrap();
Message::Text(response_str)
}
async fn read_web3_socket(
app: Extension<Arc<Web3ProxyApp>>,
mut ws_rx: SplitStream<WebSocket>,
@ -95,78 +170,25 @@ async fn read_web3_socket(
// new message from our client. forward to a backend and then send it through response_tx
let response_msg = match msg {
Message::Text(payload) => {
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(&payload) {
Ok(payload) => {
let id = payload.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> =
if payload.method == "eth_subscribe" {
// TODO: if we pass eth_subscribe the response_tx, we
let response = app
.0
.eth_subscribe(id.clone(), payload, response_tx.clone())
.await;
match response {
Ok((handle, response)) => {
// TODO: better key
subscriptions.insert(
response.result.as_ref().unwrap().to_string(),
handle,
);
Ok(response.into())
}
Err(err) => Err(err),
}
} else if payload.method == "eth_unsubscribe" {
let subscription_id = payload.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id)
{
None => "false",
Some(handle) => {
handle.abort();
"true"
}
};
let response = JsonRpcForwardedResponse::from_string(
partial_response.to_string(),
id.clone(),
);
Ok(response.into())
} else {
// TODO: if this is a subscription request, we need to do some special handling. something with channels
// TODO: just handle subscribe_newBlock
app.0.proxy_web3_rpc(payload.into()).await
};
(id, response)
}
Err(err) => {
// TODO: what should this id be?
let id = RawValue::from_string("0".to_string()).unwrap();
(id, Err(err.into()))
}
};
let response_str = match response {
Ok(x) => serde_json::to_string(&x),
Err(err) => {
// we have an anyhow error. turn it into
let response = JsonRpcForwardedResponse::from_anyhow_error(err, id);
serde_json::to_string(&response)
}
}
.unwrap();
Message::Text(response_str)
handle_socket_payload(app.0.as_ref(), &payload, &response_tx, &mut subscriptions)
.await
}
Message::Ping(x) => Message::Pong(x),
_ => unimplemented!(),
Message::Pong(x) => {
debug!("pong: {:?}", x);
continue;
}
Message::Close(_) => {
info!("closing websocket connection");
break;
}
Message::Binary(mut payload) => {
// TODO: what should we do here?
let payload = from_utf8_mut(&mut payload).unwrap();
handle_socket_payload(app.0.as_ref(), payload, &response_tx, &mut subscriptions)
.await
}
};
match response_tx.send_async(response_msg).await {