more websockets (still under construction)

also actually change data when fork is detected
This commit is contained in:
Bryan Stitt 2022-05-29 19:33:10 +00:00
parent 0387492df8
commit 1945a353dd
4 changed files with 90 additions and 30 deletions

@ -4,9 +4,11 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message;
use dashmap::DashMap;
use ethers::prelude::H256;
use futures::future::join_all;
use futures::future::Abortable;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use std::fmt;
@ -125,6 +127,16 @@ impl Web3ProxyApp {
})
}
pub async fn eth_subscribe(
&self,
payload: JsonRpcRequest,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its way easier for now
// TODO: i think we will need to change this to support unsubscribe
subscription_tx: Abortable<flume::Sender<Message>>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
unimplemented!();
}
pub fn get_balanced_rpcs(&self) -> &Web3Connections {
&self.balanced_rpcs
}

@ -114,8 +114,8 @@ impl Web3Connections {
}
pub async fn subscribe_heads(self: &Arc<Self>) {
// TODO: i don't think this needs to be very big
let (block_sender, block_receiver) = flume::bounded(16);
// TODO: benchmark bounded vs unbounded?
let (block_sender, block_receiver) = flume::unbounded();
let mut handles = vec![];
@ -133,7 +133,7 @@ impl Web3Connections {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper spann
// TODO: proper span
connection
.subscribe_new_heads(rpc_id, block_sender.clone(), true)
.instrument(tracing::info_span!("url"))
@ -221,10 +221,10 @@ impl Web3Connections {
&self,
block_receiver: flume::Receiver<(u64, H256, usize)>,
) -> anyhow::Result<()> {
let max_connections = self.inner.len();
let total_rpcs = self.inner.len();
let mut connection_states: HashMap<usize, (u64, H256)> =
HashMap::with_capacity(max_connections);
HashMap::with_capacity(total_rpcs);
let mut pending_synced_connections = SyncedConnections::default();
@ -270,7 +270,7 @@ impl Web3Connections {
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<usize>> = BTreeMap::new();
let mut synced_rpcs = 0;
let mut counted_rpcs = 0;
for (rpc_id, (block_num, block_hash)) in connection_states.iter() {
if *block_num != new_block_num {
@ -278,36 +278,34 @@ impl Web3Connections {
continue;
}
synced_rpcs += 1;
counted_rpcs += 1;
let count = rpc_ids_by_block
.entry(*block_hash)
.or_insert_with(|| Vec::with_capacity(max_connections - 1));
.or_insert_with(|| Vec::with_capacity(total_rpcs - 1));
count.push(*rpc_id);
}
let most_common_head_hash = rpc_ids_by_block
let most_common_head_hash = *rpc_ids_by_block
.iter()
.max_by(|a, b| a.1.len().cmp(&b.1.len()))
.map(|(k, _v)| k)
.unwrap();
let synced_rpcs = rpc_ids_by_block.remove(&most_common_head_hash).unwrap();
warn!(
"chain is forked! {} possible heads. {}/{}/{} rpcs have {}",
rpc_ids_by_block.len(),
rpc_ids_by_block.get(most_common_head_hash).unwrap().len(),
synced_rpcs,
max_connections,
rpc_ids_by_block.len() + 1,
synced_rpcs.len(),
counted_rpcs,
total_rpcs,
most_common_head_hash
);
// this isn't the best block in the tier. don't do anything
if !pending_synced_connections.inner.remove(&rpc_id) {
// we didn't remove anything. nothing more to do
continue;
}
// we removed. don't continue so that we update self.synced_connections
pending_synced_connections.head_block_hash = most_common_head_hash;
pending_synced_connections.inner = synced_rpcs.into_iter().collect();
}
}
cmp::Ordering::Less => {
@ -323,7 +321,7 @@ impl Web3Connections {
// the synced connections have changed
let synced_connections = Arc::new(pending_synced_connections.clone());
if synced_connections.inner.len() == max_connections {
if synced_connections.inner.len() == total_rpcs {
// TODO: more metrics
debug!("all head: {}", new_block_hash);
}

@ -7,8 +7,10 @@ use axum::{
routing::{get, post},
Extension, Json, Router,
};
use futures::future::{AbortHandle, Abortable};
use futures::stream::{SplitSink, SplitStream, StreamExt};
use futures::SinkExt;
use hashbrown::HashMap;
use serde_json::json;
use serde_json::value::RawValue;
use std::net::SocketAddr;
@ -17,7 +19,9 @@ use tracing::warn;
use crate::{
app::Web3ProxyApp,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest, JsonRpcRequestEnum},
jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
},
};
pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
@ -86,23 +90,62 @@ async fn read_web3_socket(
mut ws_rx: SplitStream<WebSocket>,
response_tx: flume::Sender<Message>,
) {
let mut subscriptions = HashMap::new();
while let Some(Ok(msg)) = ws_rx.next().await {
// new message from our client. forward to a backend and then send it through response_tx
// TODO: spawn this processing?
let response_msg = match msg {
Message::Text(payload) => {
let (id, response) = match serde_json::from_str(&payload) {
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(&payload) {
Ok(payload) => {
let payload: JsonRpcRequest = payload;
let id = payload.id.clone();
let payload = JsonRpcRequestEnum::Single(payload);
let response: anyhow::Result<JsonRpcForwardedResponseEnum> =
if payload.method == "eth_subscribe" {
let (subscription_handle, subscription_registration) =
AbortHandle::new_pair();
(id, app.0.proxy_web3_rpc(payload).await)
let subscription_response_tx =
Abortable::new(response_tx.clone(), subscription_registration);
let response: anyhow::Result<JsonRpcForwardedResponse> = app
.0
.eth_subscribe(payload, subscription_response_tx)
.await
.map(Into::into);
if let Ok(response) = &response {
// TODO: better key. maybe we should just use u64
subscriptions.insert(
response.result.as_ref().unwrap().to_string(),
subscription_handle,
);
}
response.map(JsonRpcForwardedResponseEnum::Single)
} else if payload.method == "eth_unsubscribe" {
let subscription_id = payload.params.unwrap().to_string();
subscriptions.remove(&subscription_id);
let response = JsonRpcForwardedResponse::from_string(
"true".to_string(),
id.clone(),
);
Ok(response.into())
} else {
// TODO: if this is a subscription request, we need to do some special handling. something with channels
// TODO: just handle subscribe_newBlock
app.0.proxy_web3_rpc(payload.into()).await
};
(id, response)
}
Err(err) => {
let id = RawValue::from_string("-1".to_string()).unwrap();
// TODO: what should this id be?
let id = RawValue::from_string("0".to_string()).unwrap();
(id, Err(err.into()))
}
};

@ -1,3 +1,4 @@
use derive_more::From;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::Serialize;
@ -26,7 +27,7 @@ impl fmt::Debug for JsonRpcRequest {
}
/// Requests can come in multiple formats
#[derive(Debug)]
#[derive(Debug, From)]
pub enum JsonRpcRequestEnum {
Batch(Vec<JsonRpcRequest>),
Single(JsonRpcRequest),
@ -189,6 +190,12 @@ impl JsonRpcForwardedResponse {
}
}
pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self {
let partial_response = RawValue::from_string(partial_response).unwrap();
Self::from_response(partial_response, id)
}
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
@ -261,7 +268,7 @@ impl JsonRpcForwardedResponse {
}
/// JSONRPC Responses can include one or many response objects.
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, From, Serialize)]
#[serde(untagged)]
pub enum JsonRpcForwardedResponseEnum {
Single(JsonRpcForwardedResponse),