fix eth_unsubscribe

This commit is contained in:
Bryan Stitt 2023-06-15 11:43:24 -07:00
parent 49c60ac1b5
commit 6040ca297f

View File

@ -10,6 +10,7 @@ use crate::{
errors::Web3ProxyResult, errors::Web3ProxyResult,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
}; };
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent}; use axum::headers::{Origin, Referer, UserAgent};
use axum::{ use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -19,6 +20,7 @@ use axum::{
}; };
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler; use axum_macros::debug_handler;
use ethers::types::U64;
use futures::SinkExt; use futures::SinkExt;
use futures::{ use futures::{
future::AbortHandle, future::AbortHandle,
@ -307,7 +309,7 @@ async fn handle_socket_payload(
payload: &str, payload: &str,
response_sender: &flume::Sender<Message>, response_sender: &flume::Sender<Message>,
subscription_count: &AtomicU64, subscription_count: &AtomicU64,
subscriptions: Arc<RwLock<HashMap<String, AbortHandle>>>, subscriptions: Arc<RwLock<HashMap<U64, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> { ) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
let (authorization, semaphore) = authorization.check_again(&app).await?; let (authorization, semaphore) = authorization.check_again(&app).await?;
@ -334,7 +336,12 @@ async fn handle_socket_payload(
Ok((handle, response)) => { Ok((handle, response)) => {
if let Some(subscription_id) = response.result.clone() { if let Some(subscription_id) = response.result.clone() {
let mut x = subscriptions.write().await; let mut x = subscriptions.write().await;
x.insert(subscription_id.get().to_string(), handle);
let key: U64 = serde_json::from_str(subscription_id.get()).unwrap();
info!("key: {}", key);
x.insert(key, handle);
} }
Ok(response.into()) Ok(response.into())
@ -347,25 +354,31 @@ async fn handle_socket_payload(
RequestMetadata::new(&app, authorization.clone(), &json_request, None) RequestMetadata::new(&app, authorization.clone(), &json_request, None)
.await; .await;
let subscription_id = let subscription_id: U64 =
if let Some(params) = json_request.params.get(0).and_then(|x| x.as_str()) { if let Some(param) = json_request.params.get(0).cloned() {
params serde_json::from_value(param)
} else if let Some(param) = json_request.params.as_str() { .context("failed parsing [subscription_id] as a U64")?
param
} else { } else {
return Err(Web3ProxyError::BadRequest( match serde_json::from_value::<U64>(json_request.params) {
format!( Ok(x) => x,
"unexpected params given for eth_unsubscribe ({:?})", Err(err) => {
json_request.params return Err(Web3ProxyError::BadRequest(
) format!(
.into(), "unexpected params given for eth_unsubscribe: {:?}",
)); err
)
.into(),
))
}
}
}; };
info!("key: {}", subscription_id);
// TODO: is this the right response? // TODO: is this the right response?
let partial_response = { let partial_response = {
let mut x = subscriptions.write().await; let mut x = subscriptions.write().await;
match x.remove(subscription_id) { match x.remove(&subscription_id) {
None => false, None => false,
Some(handle) => { Some(handle) => {
handle.abort(); handle.abort();
@ -438,11 +451,11 @@ async fn read_web3_socket(
let f = async move { let f = async move {
// new message from our client. forward to a backend and then send it through response_sender // new message from our client. forward to a backend and then send it through response_sender
let (response_msg, _semaphore) = match msg { let (response_msg, _semaphore) = match msg {
Message::Text(ref payload) => { Message::Text(payload) => {
match handle_socket_payload( match handle_socket_payload(
app.clone(), app,
&authorization, &authorization,
payload, &payload,
&response_sender, &response_sender,
&subscription_count, &subscription_count,
subscriptions, subscriptions,
@ -473,8 +486,8 @@ async fn read_web3_socket(
Message::Binary(mut payload) => { Message::Binary(mut payload) => {
let payload = from_utf8_mut(&mut payload).unwrap(); let payload = from_utf8_mut(&mut payload).unwrap();
match handle_socket_payload( let (m, s) = match handle_socket_payload(
app.clone(), app,
&authorization, &authorization,
payload, payload,
&response_sender, &response_sender,
@ -484,10 +497,20 @@ async fn read_web3_socket(
.await { .await {
Ok((m, s)) => (m, Some(s)), Ok((m, s)) => (m, Some(s)),
Err(err) => { Err(err) => {
// TODO: how can we get the id out of the payload?
let m = err.into_message(None); let m = err.into_message(None);
(m, None) (m, None)
} }
} };
// TODO: is this an okay way to convert from text to binary?
let m = if let Message::Text(m) = m {
Message::Binary(m.as_bytes().to_vec())
} else {
unimplemented!();
};
(m, s)
} }
}; };