From 58fa7af105ddcee4c92223b6fa9988b16b6c8994 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 9 Jul 2022 02:23:26 +0000 Subject: [PATCH] add is_archive_needed and a bunch of rpc commands --- TODO.md | 6 +- web3-proxy/src/app.rs | 203 +++++++++++++++++++++++++--- web3-proxy/src/connections.rs | 14 ++ web3-proxy/src/frontend/ws_proxy.rs | 2 +- web3-proxy/src/jsonrpc.rs | 15 ++ 5 files changed, 216 insertions(+), 24 deletions(-) diff --git a/TODO.md b/TODO.md index 3ea140ed..70d36bd5 100644 --- a/TODO.md +++ b/TODO.md @@ -60,7 +60,8 @@ - [ ] improve caching - [ ] if the eth_call (or similar) params include a block, we can cache for longer - [ ] if the call is something simple like "symbol" or "decimals", cache that too - - [ ] when we receive a block, we should store it for later eth_getBlockByNumber, eth_blockNumber, and similar calls + - [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls + - [x] eth_blockNumber without a backend request - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server) - this error seems to happen when we use load balanced backend rpcs like pokt and ankr @@ -96,3 +97,6 @@ - [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master) - [ ] subscribe to pending transactions and build an intelligent gas estimator - [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load +- [ ] flashbots specific methods + - [ ] flashbots protect fast mode or not? probably fast matches most user's needs, but no reverts is nice. + - [ ] https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#authentication maybe have per-user keys. or pass their header on if its set diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index a5c84883..58c7a112 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,7 +1,7 @@ use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; -use ethers::prelude::Transaction; +use ethers::prelude::{Address, Transaction}; use ethers::prelude::{Block, TxHash, H256}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -32,6 +32,7 @@ use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; +// TODO: make this customizable? static APP_USER_AGENT: &str = concat!( "satoshiandkin/", env!("CARGO_PKG_NAME"), @@ -42,8 +43,8 @@ static APP_USER_AGENT: &str = concat!( // TODO: put this in config? what size should we do? probably should structure this to be a size in MB const RESPONSE_CACHE_CAP: usize = 1024; -/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work -type CacheKey = (Option, String, Option); +// block hash, method, params +type CacheKey = (H256, String, Option); type ResponseLrcCache = RwLock>; @@ -73,6 +74,38 @@ pub async fn flatten_handles( Ok(()) } +fn is_archive_needed(method: &str, params: Option<&mut serde_json::Value>) -> bool { + let methods_that_may_need_archive = [ + "eth_call", + "eth_getBalance", + "eth_getCode", + "eth_getLogs", + "eth_getStorageAt", + "eth_getTransactionCount", + "eth_getTransactionByBlockHashAndIndex", + "eth_getTransactionByBlockNumberAndIndex", + "eth_getTransactionReceipt", + "eth_getUncleByBlockHashAndIndex", + "eth_getUncleByBlockNumberAndIndex", + ]; + + if !methods_that_may_need_archive.contains(&method) { + // this method is not known to require an archive node + return false; + } + + // TODO: find the given block number in params + + // TODO: if its "latest" (or not given), modify params to have the latest block. return false + + // TODO: if its "pending", do something special? return false + + // TODO: we probably need a list of recent hashes/heights. if specified block is recent, return false + + // this command needs an archive server + true +} + // TODO: think more about TxState. d #[derive(Clone)] pub enum TxState { @@ -530,7 +563,7 @@ impl Web3ProxyApp { // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py // TODO: Some requests should skip caching on the head_block_hash - let head_block_hash = Some(self.balanced_rpcs.get_head_block_hash()); + let head_block_hash = self.balanced_rpcs.get_head_block_hash(); // TODO: better key? benchmark this let key = ( @@ -559,7 +592,7 @@ impl Web3ProxyApp { // #[instrument(skip_all)] async fn proxy_web3_rpc_request( &self, - request: JsonRpcRequest, + mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -571,12 +604,17 @@ impl Web3ProxyApp { let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) match &request.method[..] { + // lots of commands are blocked "admin_addPeer" | "admin_datadir" | "admin_startRPC" | "admin_startWS" | "admin_stopRPC" | "admin_stopWS" + | "db_getHex" + | "db_getString" + | "db_putHex" + | "db_putString" | "debug_chaindbCompact" | "debug_freezeClient" | "debug_goTrace" @@ -594,6 +632,15 @@ impl Web3ProxyApp { | "debug_writeBlockProfile" | "debug_writeMemProfile" | "debug_writeMutexProfile" + | "eth_compileLLL" + | "eth_compileSerpent" + | "eth_compileSolidity" + | "eth_getCompilers" + | "eth_sendTransaction" + | "eth_sign" + | "eth_signTransaction" + | "eth_submitHashrate" + | "eth_submitWork" | "les_addBalance" | "les_setClientParams" | "les_setDefaultParams" @@ -610,20 +657,131 @@ impl Web3ProxyApp { | "personal_unlockAccount" | "personal_sendTransaction" | "personal_sign" - | "personal_ecRecover" => { + | "personal_ecRecover" + | "shh_addToGroup" + | "shh_getFilterChanges" + | "shh_getMessages" + | "shh_hasIdentity" + | "shh_newFilter" + | "shh_newGroup" + | "shh_newIdentity" + | "shh_post" + | "shh_uninstallFilter" + | "shh_version" => { // TODO: proper error code - Err(anyhow::anyhow!("unimplemented")) + Err(anyhow::anyhow!("unsupported")) } + // TODO: implement these commands + "eth_getFilterChanges" + | "eth_getFilterLogs" + | "eth_newBlockFilter" + | "eth_newFilter" + | "eth_newPendingTransactionFilter" + | "eth_uninstallFilter" => Err(anyhow::anyhow!("not yet implemented")), + // some commands can use local data or caches + "eth_accounts" => { + let partial_response = serde_json::Value::Array(vec![]); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + "eth_blockNumber" => { + let head_block_number = self.balanced_rpcs.get_head_block_num(); + + if head_block_number == 0 { + return Err(anyhow::anyhow!("no servers synced")); + } + + // TODO: this seems pretty common. make a helper? + let partial_response = format!("{:x}", head_block_number); + + let response = JsonRpcForwardedResponse::from_string(partial_response, request.id); + + Ok(response) + } + // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) + // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) + // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) + "eth_coinbase" => { + // no need for serving coinbase. we could return a per-user payment address here, but then we might leak that to dapps + let partial_response = json!(Address::zero()); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + // TODO: eth_estimateGas using anvil? + // TODO: eth_gasPrice that does awesome magic to predict the future + // TODO: eth_getBlockByHash from caches + // TODO: eth_getBlockByNumber from caches + // TODO: eth_getBlockTransactionCountByHash from caches + // TODO: eth_getBlockTransactionCountByNumber from caches + // TODO: eth_getUncleCountByBlockHash from caches + // TODO: eth_getUncleCountByBlockNumber from caches + "eth_hashrate" => { + let partial_response = json!("0x0"); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + "eth_mining" => { + let partial_response = json!(false); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + // TODO: eth_sendBundle (flashbots command) + // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit self.private_rpcs - .try_send_all_upstream_servers(request) + .try_send_all_upstream_servers(request, false) .instrument(span) .await } + "eth_syncing" => { + // TODO: return a real response if all backends are syncing or if no servers in sync + let partial_response = serde_json::Value::Bool(false); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + "net_listening" => { + // TODO: only if there are some backends on balanced_rpcs? + let partial_response = serde_json::Value::Bool(true); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + "net_peerCount" => { + let partial_response = serde_json::Value::String(format!( + "{:x}", + self.balanced_rpcs.num_synced_rpcs() + )); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + "web3_clientVersion" => { + // TODO: return a real response if all backends are syncing or if no servers in sync + let partial_response = serde_json::Value::String(APP_USER_AGENT.to_string()); + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) + } + // TODO: web3_sha3? method => { - // this is not a private transaction (or no private relays are configured) + // everything else is relayed to a backend + // this is not a private transaction + + // we do this check before checking caches because it might modify the request params + let archive_needed = is_archive_needed(method, request.params.as_mut()); let (cache_key, response_cache) = match self.get_cached_response(&request) { (cache_key, Ok(response)) => { @@ -678,29 +836,30 @@ impl Web3ProxyApp { "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { // TODO: try_send_all serially with retries instead of parallel self.private_rpcs - .try_send_all_upstream_servers(request) + .try_send_all_upstream_servers(request, archive_needed) .await? } _ => { // TODO: retries? self.balanced_rpcs - .try_send_best_upstream_server(request) + .try_send_best_upstream_server(request, archive_needed) .await? } }; - // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache - let mut response_cache = response_cache.write(); + { + let mut response_cache = response_cache.write(); - // TODO: cache the warp::reply to save us serializing every time - response_cache.insert(cache_key.clone(), response.clone()); - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - response_cache.pop_front(); + // TODO: cache the warp::reply to save us serializing every time? + response_cache.insert(cache_key.clone(), response.clone()); + + // TODO: instead of checking length, check size in bytes + if response_cache.len() >= RESPONSE_CACHE_CAP { + // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + response_cache.pop_front(); + } } - drop(response_cache); - let _ = self.incoming_requests.remove(&cache_key); let _ = incoming_tx.send(false); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 3413e8bd..ae44bd51 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -49,6 +49,10 @@ impl SyncedConnections { pub fn get_head_block_hash(&self) -> &H256 { &self.head_block_hash } + + pub fn get_head_block_num(&self) -> u64 { + self.head_block_num + } } /// A collection of web3 connections. Sends requests either the current best server or all servers. @@ -344,6 +348,10 @@ impl Web3Connections { Ok(()) } + pub fn get_head_block_num(&self) -> u64 { + self.synced_connections.load().get_head_block_num() + } + pub fn get_head_block_hash(&self) -> H256 { *self.synced_connections.load().get_head_block_hash() } @@ -352,6 +360,10 @@ impl Web3Connections { !self.synced_connections.load().inner.is_empty() } + pub fn num_synced_rpcs(&self) -> usize { + self.synced_connections.load().inner.len() + } + /// Send the same request to all the handles. Returning the most common success or most common error. #[instrument(skip_all)] pub async fn try_send_parallel_requests( @@ -683,6 +695,7 @@ impl Web3Connections { pub async fn try_send_best_upstream_server( &self, request: JsonRpcRequest, + archive_needed: bool, ) -> anyhow::Result { let mut skip_rpcs = vec![]; @@ -768,6 +781,7 @@ impl Web3Connections { pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, + archive_needed: bool, ) -> anyhow::Result { loop { match self.get_upstream_servers().await { diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 148d6090..8b042ef8 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -44,6 +44,7 @@ async fn handle_socket_payload( subscription_count: &AtomicUsize, subscriptions: &mut HashMap, ) -> Message { + // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { Ok(payload) => { let id = payload.id.clone(); @@ -90,7 +91,6 @@ async fn handle_socket_payload( (id, response) } Err(err) => { - // TODO: what should this id be? let id = RawValue::from_string("null".to_string()).unwrap(); (id, Err(err.into())) } diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index afde017c..f500d17b 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -209,6 +209,21 @@ impl JsonRpcForwardedResponse { } } + pub fn from_value(partial_response: serde_json::Value, id: Box) -> Self { + let partial_response = + serde_json::to_string(&partial_response).expect("this should always work"); + + let partial_response = + RawValue::from_string(partial_response).expect("this should always work"); + + JsonRpcForwardedResponse { + jsonrpc: "2.0".to_string(), + id, + result: Some(partial_response), + error: None, + } + } + pub fn from_ethers_error(e: ProviderError, id: Box) -> anyhow::Result { // TODO: move turning ClientError into json to a helper function? let code;