diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index ce6543f1..9b0134f8 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -4,9 +4,11 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; +use axum::extract::ws::Message; use dashmap::DashMap; use ethers::prelude::H256; use futures::future::join_all; +use futures::future::Abortable; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; use std::fmt; @@ -125,6 +127,16 @@ impl Web3ProxyApp { }) } + pub async fn eth_subscribe( + &self, + payload: JsonRpcRequest, + // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its way easier for now + // TODO: i think we will need to change this to support unsubscribe + subscription_tx: Abortable>, + ) -> anyhow::Result { + unimplemented!(); + } + pub fn get_balanced_rpcs(&self) -> &Web3Connections { &self.balanced_rpcs } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 104c6a25..473f68dc 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -114,8 +114,8 @@ impl Web3Connections { } pub async fn subscribe_heads(self: &Arc) { - // TODO: i don't think this needs to be very big - let (block_sender, block_receiver) = flume::bounded(16); + // TODO: benchmark bounded vs unbounded? + let (block_sender, block_receiver) = flume::unbounded(); let mut handles = vec![]; @@ -133,7 +133,7 @@ impl Web3Connections { // loop to automatically reconnect // TODO: make this cancellable? // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - // TODO: proper spann + // TODO: proper span connection .subscribe_new_heads(rpc_id, block_sender.clone(), true) .instrument(tracing::info_span!("url")) @@ -221,10 +221,10 @@ impl Web3Connections { &self, block_receiver: flume::Receiver<(u64, H256, usize)>, ) -> anyhow::Result<()> { - let max_connections = self.inner.len(); + let total_rpcs = self.inner.len(); let mut connection_states: HashMap = - HashMap::with_capacity(max_connections); + HashMap::with_capacity(total_rpcs); let mut pending_synced_connections = SyncedConnections::default(); @@ -270,7 +270,7 @@ impl Web3Connections { // check connection_states to see which head block is more popular! let mut rpc_ids_by_block: BTreeMap> = BTreeMap::new(); - let mut synced_rpcs = 0; + let mut counted_rpcs = 0; for (rpc_id, (block_num, block_hash)) in connection_states.iter() { if *block_num != new_block_num { @@ -278,36 +278,34 @@ impl Web3Connections { continue; } - synced_rpcs += 1; + counted_rpcs += 1; let count = rpc_ids_by_block .entry(*block_hash) - .or_insert_with(|| Vec::with_capacity(max_connections - 1)); + .or_insert_with(|| Vec::with_capacity(total_rpcs - 1)); count.push(*rpc_id); } - let most_common_head_hash = rpc_ids_by_block + let most_common_head_hash = *rpc_ids_by_block .iter() .max_by(|a, b| a.1.len().cmp(&b.1.len())) .map(|(k, _v)| k) .unwrap(); + let synced_rpcs = rpc_ids_by_block.remove(&most_common_head_hash).unwrap(); + warn!( "chain is forked! {} possible heads. {}/{}/{} rpcs have {}", - rpc_ids_by_block.len(), - rpc_ids_by_block.get(most_common_head_hash).unwrap().len(), - synced_rpcs, - max_connections, + rpc_ids_by_block.len() + 1, + synced_rpcs.len(), + counted_rpcs, + total_rpcs, most_common_head_hash ); - // this isn't the best block in the tier. don't do anything - if !pending_synced_connections.inner.remove(&rpc_id) { - // we didn't remove anything. nothing more to do - continue; - } - // we removed. don't continue so that we update self.synced_connections + pending_synced_connections.head_block_hash = most_common_head_hash; + pending_synced_connections.inner = synced_rpcs.into_iter().collect(); } } cmp::Ordering::Less => { @@ -323,7 +321,7 @@ impl Web3Connections { // the synced connections have changed let synced_connections = Arc::new(pending_synced_connections.clone()); - if synced_connections.inner.len() == max_connections { + if synced_connections.inner.len() == total_rpcs { // TODO: more metrics debug!("all head: {}", new_block_hash); } diff --git a/web3-proxy/src/frontend.rs b/web3-proxy/src/frontend.rs index 70afe576..1bcc9ce2 100644 --- a/web3-proxy/src/frontend.rs +++ b/web3-proxy/src/frontend.rs @@ -7,8 +7,10 @@ use axum::{ routing::{get, post}, Extension, Json, Router, }; +use futures::future::{AbortHandle, Abortable}; use futures::stream::{SplitSink, SplitStream, StreamExt}; use futures::SinkExt; +use hashbrown::HashMap; use serde_json::json; use serde_json::value::RawValue; use std::net::SocketAddr; @@ -17,7 +19,9 @@ use tracing::warn; use crate::{ app::Web3ProxyApp, - jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest, JsonRpcRequestEnum}, + jsonrpc::{ + JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, + }, }; pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> { @@ -86,23 +90,62 @@ async fn read_web3_socket( mut ws_rx: SplitStream, response_tx: flume::Sender, ) { + let mut subscriptions = HashMap::new(); + while let Some(Ok(msg)) = ws_rx.next().await { // new message from our client. forward to a backend and then send it through response_tx - // TODO: spawn this processing? let response_msg = match msg { Message::Text(payload) => { - let (id, response) = match serde_json::from_str(&payload) { + let (id, response) = match serde_json::from_str::(&payload) { Ok(payload) => { - let payload: JsonRpcRequest = payload; - let id = payload.id.clone(); - let payload = JsonRpcRequestEnum::Single(payload); + let response: anyhow::Result = + if payload.method == "eth_subscribe" { + let (subscription_handle, subscription_registration) = + AbortHandle::new_pair(); - (id, app.0.proxy_web3_rpc(payload).await) + let subscription_response_tx = + Abortable::new(response_tx.clone(), subscription_registration); + + let response: anyhow::Result = app + .0 + .eth_subscribe(payload, subscription_response_tx) + .await + .map(Into::into); + + if let Ok(response) = &response { + // TODO: better key. maybe we should just use u64 + subscriptions.insert( + response.result.as_ref().unwrap().to_string(), + subscription_handle, + ); + } + + response.map(JsonRpcForwardedResponseEnum::Single) + } else if payload.method == "eth_unsubscribe" { + let subscription_id = payload.params.unwrap().to_string(); + + subscriptions.remove(&subscription_id); + + let response = JsonRpcForwardedResponse::from_string( + "true".to_string(), + id.clone(), + ); + + Ok(response.into()) + } else { + // TODO: if this is a subscription request, we need to do some special handling. something with channels + // TODO: just handle subscribe_newBlock + + app.0.proxy_web3_rpc(payload.into()).await + }; + + (id, response) } Err(err) => { - let id = RawValue::from_string("-1".to_string()).unwrap(); + // TODO: what should this id be? + let id = RawValue::from_string("0".to_string()).unwrap(); (id, Err(err.into())) } }; diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 3e1dd6e5..36e0c84c 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -1,3 +1,4 @@ +use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::Serialize; @@ -26,7 +27,7 @@ impl fmt::Debug for JsonRpcRequest { } /// Requests can come in multiple formats -#[derive(Debug)] +#[derive(Debug, From)] pub enum JsonRpcRequestEnum { Batch(Vec), Single(JsonRpcRequest), @@ -189,6 +190,12 @@ impl JsonRpcForwardedResponse { } } + pub fn from_string(partial_response: String, id: Box) -> Self { + let partial_response = RawValue::from_string(partial_response).unwrap(); + + Self::from_response(partial_response, id) + } + pub fn from_response(partial_response: Box, id: Box) -> Self { JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), @@ -261,7 +268,7 @@ impl JsonRpcForwardedResponse { } /// JSONRPC Responses can include one or many response objects. -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, From, Serialize)] #[serde(untagged)] pub enum JsonRpcForwardedResponseEnum { Single(JsonRpcForwardedResponse),