From 6d92f33dc47c563c0bf8ba6ba790f938a8ae74ed Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 10 Aug 2022 21:29:50 +0000 Subject: [PATCH] bug fixes --- TODO.md | 1 + web3_proxy/src/app.rs | 13 +++++--- web3_proxy/src/connection.rs | 14 ++++++--- web3_proxy/src/connections.rs | 59 ++++++++++++++++++----------------- web3_proxy/src/jsonrpc.rs | 13 ++++++-- 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/TODO.md b/TODO.md index f493cb89..5af72881 100644 --- a/TODO.md +++ b/TODO.md @@ -81,6 +81,7 @@ - [x] send getTransaction rpc requests to the private rpc tier - [x] I'm hitting infura rate limits very quickly. I feel like that means something is very inefficient - whenever blocks were slow, we started checking as fast as possible +- [ ] send logs to sentry - [ ] cli tool for resetting api keys - [ ] nice output when cargo doc is run - [ ] if we request an old block, more servers can handle it than we currently use. diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index dc731101..4f05db8d 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -221,6 +221,7 @@ impl Web3ProxyApp { } }; + // TODO: this should be a broadcast channel let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); @@ -260,10 +261,10 @@ impl Web3ProxyApp { private_rpcs, http_client.clone(), redis_pool.clone(), - // subscribing to new heads here won't work well + // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs + None, + // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits None, - // TODO: subscribe to pending transactions on the private rpcs? - Some(pending_tx_sender.clone()), pending_transactions.clone(), ) .await @@ -353,11 +354,15 @@ impl Web3ProxyApp { "method":"eth_subscription", "params": { "subscription": subscription_id, + // TODO: option to include full transaction objects instead of just the hashes? "result": new_head.as_ref(), }, }); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + // TODO: do clients support binary messages? + let msg = Message::Text( + serde_json::to_string(&msg).expect("this should always be valid json"), + ); if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 77a6bf6f..2a25b0fb 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -303,8 +303,11 @@ impl Web3Connection { match block { Ok(block) => { { - let hash = block.hash.unwrap(); - let num = block.number.unwrap(); + // TODO: is this true? Block::default probably doesn't + let hash = block.hash.expect("blocks here should always have hashes"); + let num = block + .number + .expect("blocks here should always have numbers"); let mut head_block = self.head_block.write(); @@ -443,11 +446,12 @@ impl Web3Connection { continue; } Ok(HandleResult::None) => { - // TODO: what should we do? warn!("No handle for latest block from {}", self); + // TODO: what should we do? } Err(err) => { - warn!(?err, "Rate limited on latest block from {}", self); + warn!(?err, "Internal error on latest block from {}", self); + // TODO: what should we do? sleep? extra time? } } @@ -548,6 +552,8 @@ impl Web3Connection { .send_async((pending_tx_id, self.clone())) .await .context("tx_id_sender")?; + + // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription } warn!("subscription ended"); diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 9a3df63b..78c36903 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -31,7 +31,7 @@ use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; -/// A collection of Web3Connections that are on the same block +/// A collection of Web3Connections that are on the same block. /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] struct SyncedConnections { @@ -385,8 +385,6 @@ impl Web3Connections { }); futures.push(flatten_handle(handle)); - } else { - unimplemented!(); } // setup the block funnel @@ -406,7 +404,15 @@ impl Web3Connections { if futures.is_empty() { // no transaction or block subscriptions. - todo!("every second, check that the provider is still connected"); + // todo!("every second, check that the provider is still connected");? + + let handle = task::Builder::default().name("noop").spawn(async move { + loop { + sleep(Duration::from_secs(600)).await; + } + }); + + futures.push(flatten_handle(handle)); } if let Err(e) = try_join_all(futures).await { @@ -457,6 +463,8 @@ impl Web3Connections { // but before we do any queries, be sure the requested block num exists let head_block_num = self.head_block_num(); if num > &head_block_num { + // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing + // TODO: instead of error, maybe just sleep and try again? return Err(anyhow::anyhow!( "Head block is #{}, but #{} was requested", head_block_num, @@ -466,8 +474,7 @@ impl Web3Connections { // TODO: helper for method+params => JsonRpcRequest // TODO: get block with the transactions? - let request = - json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); + let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: if error, retry? @@ -581,8 +588,7 @@ impl Web3Connections { panic!("i don't think this is possible") } - /// TODO: possible dead lock here. investigate more. probably refactor - /// TODO: move parts of this onto SyncedConnections? + /// TODO: move parts of this onto SyncedConnections? it needs to be simpler // we don't instrument here because we put a span inside the while loop async fn update_synced_rpcs( &self, @@ -595,19 +601,14 @@ impl Web3Connections { // TODO: rpc name instead of url (will do this with config reload revamp) // TODO: indexmap or hashmap? what hasher? with_capacity? + // TODO: this will grow unbounded. prune old heads automatically let mut connection_heads = IndexMap::>>::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - if let Some(current_block) = connection_heads.get(&rpc.url) { - if current_block.hash == new_block.hash { - // duplicate block - continue; - } - } - let new_block_hash = if let Some(hash) = new_block.hash { hash } else { + // TODO: i think this should just be debug, but maybe it is a warning warn!(%rpc, ?new_block, "Block without hash!"); connection_heads.remove(&rpc.url); @@ -640,7 +641,6 @@ impl Web3Connections { connection_heads.remove(&rpc.url); } else { - // TODO: no clone? we end up with different blocks for every rpc connection_heads.insert(rpc.url.clone(), new_block.clone()); self.chain.add_block(new_block.clone(), false); @@ -651,7 +651,9 @@ impl Web3Connections { .values() .min_by(|a, b| a.number.cmp(&b.number)) { - lowest_block.number.unwrap() + lowest_block + .number + .expect("all blocks here should have a number") } else { continue; }; @@ -723,7 +725,7 @@ impl Web3Connections { // TODO: there are sortable traits, but this seems simpler /// sort the blocks in descending height fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) { - trace!(?self.block, ?self.conns); + // trace!(?self.block, ?self.conns); // first we care about the block number let block_num = self.block.number.as_ref().unwrap(); @@ -775,8 +777,8 @@ impl Web3Connections { }) } }) - // sort b to a for descending order. sort a to b for ascending order - .max_by(|a, b| b.sortable_values().cmp(&a.sortable_values())) + // sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart + .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) { let best_head_num = x.block.number.unwrap(); let best_head_hash = x.block.hash.unwrap(); @@ -832,8 +834,8 @@ impl Web3Connections { "{}/{} rpcs at {} ({}). head at {:?}", pending_synced_connections.conns.len(), self.conns.len(), - pending_synced_connections.head_block_hash, pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, pending_synced_connections .conns .iter() @@ -859,8 +861,8 @@ impl Web3Connections { trace!( "all {} rpcs at {} ({})", num_best_rpcs, - pending_synced_connections.head_block_hash, pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, ); } else { trace!( @@ -868,27 +870,26 @@ impl Web3Connections { "{}/{} rpcs at {} ({})", num_best_rpcs, self.conns.len(), - pending_synced_connections.head_block_hash, pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, ); } // TODO: do this before or after processing all the transactions in this block? - // TODO: only swap if there is a change - trace!(?pending_synced_connections, "swapping"); + // TODO: only swap if there is a change? + debug!(?pending_synced_connections, "swapping"); self.synced_connections .swap(Arc::new(pending_synced_connections)); if new_head_block { - // TODO: this will need a refactor to only send once a minmum threshold has this block - // TODO: move this onto self.chain - // TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail + // TODO: is new_head_block accurate? + // TODO: move this onto self.chain? head_block_sender .send(new_block.clone()) .context("head_block_sender")?; } } else { - // TODO: this expected when we first start + // TODO: is this expected when we first start? // TODO: make sure self.synced_connections is empty warn!("not enough rpcs in sync"); } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index d1cdd32b..b5ffc06d 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -9,7 +9,7 @@ use tracing::warn; #[derive(Clone, serde::Deserialize)] pub struct JsonRpcRequest { // TODO: skip jsonrpc entireley? - // pub jsonrpc: Box, + pub jsonrpc: Box, /// id could be a stricter type, but many rpcs do things against the spec pub id: Box, pub method: String, @@ -78,6 +78,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { A: MapAccess<'de>, { // TODO: i feel like this should be easier + let mut jsonrpc = None; let mut id = None; let mut method = None; let mut params = None; @@ -88,7 +89,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { // throw away the value // TODO: should we check that it's 2.0? // TODO: how do we skip over this value entirely? - let _: String = map.next_value()?; + jsonrpc = Some(map.next_value()?); } Field::Id => { if id.is_some() { @@ -111,6 +112,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { } } + let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?; let id = id.ok_or_else(|| de::Error::missing_field("id"))?; let method = method.ok_or_else(|| de::Error::missing_field("method"))?; @@ -119,7 +121,12 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum { Some(x) => Some(x), }; - let single = JsonRpcRequest { id, method, params }; + let single = JsonRpcRequest { + jsonrpc, + id, + method, + params, + }; Ok(JsonRpcRequestEnum::Single(single)) }