From 2833737ae755307526418e08b9db8aa9708e646d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 22 Jul 2022 19:30:39 +0000 Subject: [PATCH] improve caching --- TODO.md | 12 +- config/example.toml | 2 + web3-proxy/src/app.rs | 192 +++++++++++++--------------- web3-proxy/src/config.rs | 17 ++- web3-proxy/src/connection.rs | 18 +-- web3-proxy/src/connections.rs | 75 ++++++++--- web3-proxy/src/frontend/ws_proxy.rs | 8 +- web3-proxy/src/jsonrpc.rs | 21 +-- web3-proxy/src/main.rs | 4 +- 9 files changed, 187 insertions(+), 162 deletions(-) diff --git a/TODO.md b/TODO.md index d25463db..7615a9d4 100644 --- a/TODO.md +++ b/TODO.md @@ -46,8 +46,11 @@ - we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times - [x] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server) - this error seems to happen when we use load balanced backend rpcs like pokt and ankr -- [ ] if we don't cache errors, then in-flight request caching is going to bottleneck +- [x] RESPONSE_CACHE_CAP in bytes instead of number of entries +- [x] if we don't cache errors, then in-flight request caching is going to bottleneck - i think now that we retry header not found and similar, caching errors should be fine +- [x] RESPONSE_CACHE_CAP from config +- [x] web3_sha3 rpc command - [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. - thundering herd problem if we only allow a lag of 0 blocks - we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server @@ -66,7 +69,9 @@ - [x] send getTransaction rpc requests to the private rpc tier - [x] I'm hitting infura rate limits very quickly. I feel like that means something is very inefficient - whenever blocks were slow, we started checking as fast as possible -- [ ] eth_getBlockByNumber and similar calls served from the block map +- [cancelled] eth_getBlockByNumber and similar calls served from the block map + - will need all Block **and** Block in caches or fetched efficiently + - so maybe we don't want this. we can just use the general request cache for these. they will only require 1 request and it means requests won't get in the way as much on writes as new blocks arrive. - [ ] incoming rate limiting by api key - [ ] refactor so configs can change while running - create the app without applying any config to it @@ -75,6 +80,7 @@ - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] have a "backup" tier that is only used when the primary tier has no servers or is many blocks behind - we don't want the backup tier taking over with the head block if they happen to be fast at that (but overall low/expensive rps). only if the primary tier has fallen behind or gone entirely offline should we go to third parties + - [ ] until this is done, an alternative is for infra to have a "failover" script that changes the configs to include a bunch of third party servers manually. - [ ] stats when forks are resolved (and what chain they were on?) - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] right now the block_map is unbounded. move this to redis and do some calculations to be sure about RAM usage @@ -170,3 +176,5 @@ in another repo: event subscriber - [ ] Maybe storing pending txs on receipt in a dashmap is wrong. We want to store in a timer_heap (or similar) when we actually send. This way there's no lock contention until the race is over. - [ ] Support "safe" block height. It's planned for eth2 but we can kind of do it now but just doing head block num-3 - [ ] Archive check on BSC gave “archive” when it isn’t. and FTM gave 90k for all servers even though they should be archive +- [ ] cache eth_getLogs in a database? +- [ ] stats for "read amplification". how many backend requests do we send compared to frontend requests we received? diff --git a/config/example.toml b/config/example.toml index 05577944..be68a8a5 100644 --- a/config/example.toml +++ b/config/example.toml @@ -3,6 +3,8 @@ chain_id = 1 # in prod, do `rate_limit_redis = "redis://redis:6379/"` #rate_limit_redis = "redis://dev-redis:6379/" public_rate_limit_per_minute = 60_000 +# 1GB of cache +response_cache_max_bytes = 10 ^ 9 [balanced_rpcs] diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 6f2479d8..980414db 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,6 +1,7 @@ use axum::extract::ws::Message; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; +use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64}; use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; @@ -13,6 +14,7 @@ use redis_cell_client::bb8::ErrorSink; use redis_cell_client::{bb8, RedisCellClient, RedisConnectionManager}; use serde_json::json; use std::fmt; +use std::mem::size_of_val; use std::pin::Pin; use std::str::FromStr; use std::sync::atomic::{self, AtomicUsize}; @@ -22,7 +24,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; use crate::bb8_helpers; use crate::config::AppConfig; @@ -41,9 +43,6 @@ static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), ); -// TODO: put this in config? what size should we do? probably should structure this to be a size in MB -const RESPONSE_CACHE_CAP: usize = 1024; - // block hash, method, params type CacheKey = (H256, String, Option); @@ -130,7 +129,7 @@ fn get_or_set_block_number( fn get_min_block_needed( method: &str, params: Option<&mut serde_json::Value>, - latest_block: U64, + head_block: U64, ) -> Option { let params = params?; @@ -159,7 +158,7 @@ fn get_min_block_needed( if let Some(x) = obj.get_mut("fromBlock") { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - let (modified, block_num) = block_num_to_u64(block_num, latest_block); + let (modified, block_num) = block_num_to_u64(block_num, head_block); if modified { *x = serde_json::to_value(block_num).unwrap(); @@ -171,7 +170,7 @@ fn get_min_block_needed( if let Some(x) = obj.get_mut("toBlock") { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - let (modified, block_num) = block_num_to_u64(block_num, latest_block); + let (modified, block_num) = block_num_to_u64(block_num, head_block); if modified { *x = serde_json::to_value(block_num).unwrap(); @@ -183,7 +182,7 @@ fn get_min_block_needed( if let Some(x) = obj.get("blockHash") { // TODO: check a linkedhashmap of recent hashes // TODO: error if fromBlock or toBlock were set - unimplemented!("handle blockHash {}", x); + todo!("handle blockHash {}", x); } return None; @@ -224,7 +223,7 @@ fn get_min_block_needed( } }; - match get_or_set_block_number(params, block_param_id, latest_block) { + match get_or_set_block_number(params, block_param_id, head_block) { Ok(block) => Some(block), Err(err) => { // TODO: seems unlikely that we will get here @@ -253,6 +252,8 @@ pub struct Web3ProxyApp { /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, incoming_requests: ActiveRequestsMap, + /// bytes available to response_cache (it will be slightly larger than this) + response_cache_max_bytes: AtomicUsize, response_cache: ResponseLrcCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? @@ -308,7 +309,7 @@ impl Web3ProxyApp { .build()?, ); - let redis_client_pool = match app_config.shared.rate_limit_redis { + let redis_client_pool = match app_config.shared.redis_url { Some(redis_address) => { info!("Connecting to redis on {}", redis_address); @@ -406,6 +407,7 @@ impl Web3ProxyApp { balanced_rpcs, private_rpcs, incoming_requests: Default::default(), + response_cache_max_bytes: AtomicUsize::new(app_config.shared.response_cache_max_bytes), response_cache: Default::default(), head_block_receiver, pending_tx_sender, @@ -608,7 +610,7 @@ impl Web3ProxyApp { // TODO: do something with subscription_join_handle? - let response = JsonRpcForwardedResponse::from_string(subscription_id, id); + let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? @@ -627,19 +629,20 @@ impl Web3ProxyApp { &self.incoming_requests } - /// send the request to the approriate RPCs - /// TODO: dry this up + /// send the request or batch of requests to the approriate RPCs #[instrument(skip_all)] pub async fn proxy_web3_rpc( &self, request: JsonRpcRequestEnum, ) -> anyhow::Result { - // TODO: i don't always see this in the logs. why? - trace!("Received request: {:?}", request); + debug!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, - // we need a timeout for the incoming request so that delays from - let max_time = Duration::from_secs(60); + // we need a timeout for the incoming request so that retries don't run forever + // TODO: take this as an optional argument. per user max? expiration time instead of duration? + let max_time = Duration::from_secs(120); + + // TODO: instrument this with a unique id let response = match request { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( @@ -650,8 +653,7 @@ impl Web3ProxyApp { ), }; - // TODO: i don't always see this in the logs. why? - trace!("Forwarding response: {:?}", response); + debug!(?response, "Forwarding response"); Ok(response) } @@ -686,7 +688,7 @@ impl Web3ProxyApp { async fn get_cached_response( &self, // TODO: accept a block hash here also? - min_block_needed: Option, + min_block_needed: Option<&U64>, request: &JsonRpcRequest, ) -> anyhow::Result<( CacheKey, @@ -712,13 +714,13 @@ impl Web3ProxyApp { if let Some(response) = self.response_cache.read().get(&key) { // TODO: emit a stat - trace!(?request.method, "cache hit!"); + debug!(?request.method, "cache hit!"); // TODO: can we make references work? maybe put them in an Arc? return Ok((key, Ok(response.to_owned()))); } else { // TODO: emit a stat - trace!(?request.method, "cache miss!"); + debug!(?request.method, "cache miss!"); } // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) @@ -741,7 +743,8 @@ impl Web3ProxyApp { // // TODO: add more to this span such as let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - match request.method.as_ref() { + + let partial_response: serde_json::Value = match request.method.as_ref() { // lots of commands are blocked "admin_addPeer" | "admin_datadir" @@ -807,7 +810,7 @@ impl Web3ProxyApp { | "shh_uninstallFilter" | "shh_version" => { // TODO: proper error code - Err(anyhow::anyhow!("unsupported")) + return Err(anyhow::anyhow!("unsupported")); } // TODO: implement these commands "eth_getFilterChanges" @@ -815,66 +818,39 @@ impl Web3ProxyApp { | "eth_newBlockFilter" | "eth_newFilter" | "eth_newPendingTransactionFilter" - | "eth_uninstallFilter" => Err(anyhow::anyhow!("not yet implemented")), + | "eth_uninstallFilter" => return Err(anyhow::anyhow!("not yet implemented")), // some commands can use local data or caches - "eth_accounts" => { - let partial_response = serde_json::Value::Array(vec![]); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) - } + "eth_accounts" => serde_json::Value::Array(vec![]), "eth_blockNumber" => { let head_block_number = self.balanced_rpcs.get_head_block_num(); - if head_block_number == 0 { + if head_block_number.as_u64() == 0 { return Err(anyhow::anyhow!("no servers synced")); } - let response = JsonRpcForwardedResponse::from_number(head_block_number, request.id); - - Ok(response) + json!(head_block_number) } // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) "eth_coinbase" => { - // no need for serving coinbase. we could return a per-user payment address here, but then we might leak that to dapps - let partial_response = json!(Address::zero()); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) + // no need for serving coinbase + // we could return a per-user payment address here, but then we might leak that to dapps + json!(Address::zero()) } // TODO: eth_estimateGas using anvil? // TODO: eth_gasPrice that does awesome magic to predict the future - // TODO: eth_getBlockByHash from caches - "eth_getBlockByHash" => { - unimplemented!("wip") - } - // TODO: eth_getBlockByNumber from caches - // TODO: eth_getBlockTransactionCountByHash from caches - // TODO: eth_getBlockTransactionCountByNumber from caches - // TODO: eth_getUncleCountByBlockHash from caches - // TODO: eth_getUncleCountByBlockNumber from caches "eth_hashrate" => { - let partial_response = json!("0x0"); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) + json!(U64::zero()) } "eth_mining" => { - let partial_response = json!(false); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) + json!(false) } // TODO: eth_sendBundle (flashbots command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => match &request.params { Some(serde_json::Value::Array(params)) => { + // parsing params like this is gross. make struct and use serde to do all these checks and error handling if params.len() != 1 || !params[0].is_string() { return Err(anyhow::anyhow!("invalid request")); } @@ -882,59 +858,56 @@ impl Web3ProxyApp { let raw_tx = Bytes::from_str(params[0].as_str().unwrap())?; if check_firewall_raw(&raw_tx).await? { - self.private_rpcs + return self + .private_rpcs .try_send_all_upstream_servers(request, None) .instrument(span) - .await + .await; } else { - Err(anyhow::anyhow!("transaction blocked by firewall")) + return Err(anyhow::anyhow!("transaction blocked by firewall")); } } - _ => Err(anyhow::anyhow!("invalid request")), + _ => return Err(anyhow::anyhow!("invalid request")), }, "eth_syncing" => { // TODO: return a real response if all backends are syncing or if no servers in sync - let partial_response = json!(false); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) + json!(false) } "net_listening" => { // TODO: only if there are some backends on balanced_rpcs? - let partial_response = json!(true); - - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) + json!(true) } - "net_peerCount" => { - let response = JsonRpcForwardedResponse::from_number( - self.balanced_rpcs.num_synced_rpcs(), - request.id, - ); + "net_peerCount" => self.balanced_rpcs.num_synced_rpcs().into(), + "web3_clientVersion" => serde_json::Value::String(APP_USER_AGENT.to_string()), + "web3_sha3" => { + // returns Keccak-256 (not the standardized SHA3-256) of the given data. + match &request.params { + Some(serde_json::Value::Array(params)) => { + if params.len() != 1 || !params[0].is_string() { + return Err(anyhow::anyhow!("invalid request")); + } - Ok(response) + let param = Bytes::from_str(params[0].as_str().unwrap())?; + + let hash = H256::from(keccak256(param)); + + json!(hash) + } + _ => return Err(anyhow::anyhow!("invalid request")), + } } - "web3_clientVersion" => { - // TODO: return a real response if all backends are syncing or if no servers in sync - let partial_response = serde_json::Value::String(APP_USER_AGENT.to_string()); - let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); - - Ok(response) - } // TODO: web3_sha3? + // anything else gets sent to backend rpcs and cached method => { - // everything else is relayed to a backend - // this is not a private transaction - let head_block_number = self.balanced_rpcs.get_head_block_num(); // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different let min_block_needed = - get_min_block_needed(method, request.params.as_mut(), head_block_number.into()); + get_min_block_needed(method, request.params.as_mut(), head_block_number); + + let min_block_needed = min_block_needed.as_ref(); trace!(?min_block_needed, ?method); @@ -1007,24 +980,43 @@ impl Web3ProxyApp { } }; + // TODO: move this caching outside this match and cache some of the other responses? + // TODO: cache the warp::reply to save us serializing every time? { let mut response_cache = response_cache.write(); - // TODO: cache the warp::reply to save us serializing every time? - response_cache.insert(cache_key.clone(), response.clone()); + let response_cache_max_bytes = self + .response_cache_max_bytes + .load(atomic::Ordering::Acquire); - // TODO: instead of checking length, check size in bytes - if response_cache.len() >= RESPONSE_CACHE_CAP { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - response_cache.pop_front(); + // TODO: this might be too naive. not sure how much overhead the object has + let new_size = size_of_val(&cache_key) + size_of_val(&response); + + // no item is allowed to take more than 1% of the cache + // TODO: get this from config? + if new_size < response_cache_max_bytes / 100 { + // TODO: this probably has wildly variable timings + while size_of_val(&response_cache) + new_size >= response_cache_max_bytes { + // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block + response_cache.pop_front(); + } + + response_cache.insert(cache_key.clone(), response.clone()); + } else { + // TODO: emit a stat instead? + warn!(?new_size, "value too large for caching"); } } let _ = self.incoming_requests.remove(&cache_key); let _ = incoming_tx.send(false); - Ok(response) + return Ok(response); } - } + }; + + let response = JsonRpcForwardedResponse::from_value(partial_response, request.id); + + Ok(response) } } diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 20dd947d..8021bf12 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -38,10 +38,21 @@ pub struct AppConfig { pub struct RpcSharedConfig { // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 pub chain_id: u64, - pub rate_limit_redis: Option, - // TODO: serde default for development? - // TODO: allow no limit? + pub redis_url: Option, + #[serde(default = "default_public_rate_limit_per_minute")] pub public_rate_limit_per_minute: u32, + #[serde(default = "default_response_cache_max_bytes")] + pub response_cache_max_bytes: usize, +} + +fn default_public_rate_limit_per_minute() -> u32 { + 0 +} + +fn default_response_cache_max_bytes() -> usize { + // TODO: default to some percentage of the system? + // 100 megabytes + 10_usize.pow(8) } #[derive(Debug, Deserialize)] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index d26b8d5d..cccbc696 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -14,7 +14,7 @@ use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock; use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; -use tracing::{error, info, instrument, trace, warn}; +use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; @@ -42,14 +42,13 @@ impl Web3Provider { .interval(Duration::from_secs(13)) .into() } else if url_str.starts_with("ws") { - // TODO: wrapper automatically reconnect - let provider = ethers::providers::Ws::connect(url_str).await?; + let provider = ethers::providers::Ws::connect(url_str) + .instrument(info_span!("Web3Provider", url_str = url_str)) + .await?; // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: i don't think this interval matters - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() + ethers::providers::Provider::new(provider).into() } else { return Err(anyhow::anyhow!("only http and ws servers are supported")); }; @@ -272,7 +271,7 @@ impl Web3Connection { self.block_data_limit.load(atomic::Ordering::Acquire).into() } - pub fn has_block_data(&self, needed_block_num: U64) -> bool { + pub fn has_block_data(&self, needed_block_num: &U64) -> bool { let block_data_limit: U64 = self.get_block_data_limit(); let newest_block_num = self.head_block.read().1; @@ -281,7 +280,7 @@ impl Web3Connection { .saturating_sub(block_data_limit) .max(U64::one()); - needed_block_num >= oldest_block_num && needed_block_num <= newest_block_num + needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num } #[instrument(skip_all)] @@ -539,7 +538,7 @@ impl Web3Connection { match self.try_request_handle().await { Ok(active_request_handle) => { // TODO: check the filter - unimplemented!("actually send a request"); + todo!("actually send a request"); } Err(e) => { warn!("Failed getting latest block from {}: {:?}", self, e); @@ -622,6 +621,7 @@ impl Web3Connection { impl Hash for Web3Connection { fn hash(&self, state: &mut H) { + // TODO: this is wrong. we might have two connections to the same provider self.url.hash(state); } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index a1d200c6..cdb644cf 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -56,8 +56,8 @@ impl SyncedConnections { &self.head_block_hash } - pub fn get_head_block_num(&self) -> u64 { - self.head_block_num + pub fn get_head_block_num(&self) -> U64 { + self.head_block_num.into() } } @@ -94,11 +94,11 @@ impl BlockChain { self.block_map.entry(hash).or_insert(block); } - pub fn get_block(&self, num: &U64) -> Option>> { + pub fn get_cannonical_block(&self, num: &U64) -> Option>> { self.chain_map.get(num).map(|x| x.clone()) } - pub fn get_block_from_hash(&self, hash: &H256) -> Option>> { + pub fn get_block(&self, hash: &H256) -> Option>> { self.block_map.get(hash).map(|x| x.clone()) } } @@ -304,6 +304,7 @@ impl Web3Connections { } } + /// dedupe transactions and send them to any listening clients async fn funnel_transaction( self: Arc, rpc: Arc, @@ -354,7 +355,9 @@ impl Web3Connections { Ok(()) } - /// subscribe to all the backend rpcs + /// subscribe to blocks and transactions from all the backend rpcs. + /// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver` + /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, @@ -408,7 +411,7 @@ impl Web3Connections { if futures.is_empty() { // no transaction or block subscriptions. - unimplemented!("every second, check that the provider is still connected"); + todo!("every second, check that the provider is still connected"); } if let Err(e) = try_join_all(futures).await { @@ -421,14 +424,44 @@ impl Web3Connections { Ok(()) } - pub async fn get_block(&self, num: U64) -> anyhow::Result>> { - if let Some(block) = self.chain.get_block(&num) { + pub async fn get_block(&self, hash: &H256) -> anyhow::Result>> { + // first, try to get the hash from our cache + if let Some(block) = self.chain.get_block(hash) { return Ok(block); } - let head_block_num = self.get_head_block_num(); + // block not in cache. we need to ask an rpc for it - if num.as_u64() > head_block_num { + // TODO: helper for method+params => JsonRpcRequest + // TODO: get block with the transactions? + let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) }); + let request: JsonRpcRequest = serde_json::from_value(request)?; + + // TODO: if error, retry? + let response = self.try_send_best_upstream_server(request, None).await?; + + let block = response.result.unwrap(); + + let block: Block = serde_json::from_str(block.get())?; + + let block = Arc::new(block); + + self.chain.add_block(block.clone(), false); + + Ok(block) + } + + /// Get the heaviest chain's block from cache or backend rpc + pub async fn get_cannonical_block(&self, num: &U64) -> anyhow::Result>> { + // first, try to get the hash from our cache + if let Some(block) = self.chain.get_cannonical_block(num) { + return Ok(block); + } + + // block not in cache. we need to ask an rpc for it + // but before we do any queries, be sure the requested block num exists + let head_block_num = self.get_head_block_num(); + if num > &head_block_num { return Err(anyhow::anyhow!( "Head block is #{}, but #{} was requested", head_block_num, @@ -437,6 +470,7 @@ impl Web3Connections { } // TODO: helper for method+params => JsonRpcRequest + // TODO: get block with the transactions? let request = json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?; @@ -457,17 +491,16 @@ impl Web3Connections { Ok(block) } - pub async fn get_block_hash(&self, num: U64) -> anyhow::Result { - // first, try to get the hash from our cache - // TODO: move this cache to redis? - let block = self.get_block(num).await?; + /// Convenience method to get the cannonical block at a given block height. + pub async fn get_block_hash(&self, num: &U64) -> anyhow::Result { + let block = self.get_cannonical_block(num).await?; let hash = block.hash.unwrap(); Ok(hash) } - pub fn get_head_block(&self) -> (u64, H256) { + pub fn get_head_block(&self) -> (U64, H256) { let synced_connections = self.synced_connections.load(); let num = synced_connections.get_head_block_num(); @@ -480,7 +513,7 @@ impl Web3Connections { *self.synced_connections.load().get_head_block_hash() } - pub fn get_head_block_num(&self) -> u64 { + pub fn get_head_block_num(&self) -> U64 { self.synced_connections.load().get_head_block_num() } @@ -488,7 +521,7 @@ impl Web3Connections { if self.synced_connections.load().inner.is_empty() { return false; } - self.get_head_block_num() > 0 + self.get_head_block_num() > U64::zero() } pub fn num_synced_rpcs(&self) -> usize { @@ -756,7 +789,7 @@ impl Web3Connections { pub async fn next_upstream_server( &self, skip: &[Arc], - min_block_needed: Option, + min_block_needed: Option<&U64>, ) -> Result> { let mut earliest_retry_after = None; @@ -833,7 +866,7 @@ impl Web3Connections { /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions pub async fn get_upstream_servers( &self, - min_block_needed: Option, + min_block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_after = None; // TODO: with capacity? @@ -868,7 +901,7 @@ impl Web3Connections { pub async fn try_send_best_upstream_server( &self, request: JsonRpcRequest, - min_block_needed: Option, + min_block_needed: Option<&U64>, ) -> anyhow::Result { let mut skip_rpcs = vec![]; @@ -964,7 +997,7 @@ impl Web3Connections { pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, - min_block_needed: Option, + min_block_needed: Option<&U64>, ) -> anyhow::Result { loop { match self.get_upstream_servers(min_block_needed).await { diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 8b042ef8..11e420b7 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -9,7 +9,7 @@ use futures::{ stream::{SplitSink, SplitStream, StreamExt}, }; use hashbrown::HashMap; -use serde_json::value::RawValue; +use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tracing::{error, info, trace}; @@ -78,10 +78,8 @@ async fn handle_socket_payload( } }; - let response = JsonRpcForwardedResponse::from_string( - partial_response.to_string(), - id.clone(), - ); + let response = + JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone()); Ok(response.into()) } diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 801e6ff1..4ae5d9dc 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -4,12 +4,12 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::Serialize; use serde_json::value::RawValue; use std::fmt; -use tracing::trace; #[derive(Clone, serde::Deserialize)] pub struct JsonRpcRequest { // TODO: skip jsonrpc entireley? // pub jsonrpc: Box, + /// id could be a stricter type, but many rpcs do things against the spec pub id: Box, pub method: String, pub params: Option, @@ -180,16 +180,6 @@ impl JsonRpcForwardedResponse { } } - pub fn from_number( - partial_response: T, - id: Box, - ) -> Self { - // TODO: proper escaping. this feels wrong. probably refactor to not need this at all - let partial_response = format!("0x{:x}", partial_response); - - Self::from_string(partial_response, id) - } - pub fn from_response(partial_response: Box, id: Box) -> Self { JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), @@ -200,15 +190,6 @@ impl JsonRpcForwardedResponse { } } - pub fn from_string(partial_response: String, id: Box) -> Self { - trace!("partial_response: {}", partial_response); - // TODO: anyhow result on this - // TODO: proper escaping. this feels wrong. probably refactor to not need this at all - let partial_response = RawValue::from_string(format!(r#""{}""#, partial_response)).unwrap(); - - Self::from_response(partial_response, id) - } - pub fn from_value(partial_response: serde_json::Value, id: Box) -> Self { let partial_response = serde_json::to_string(&partial_response).expect("this should always work"); diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 1dba653f..037845f0 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -15,7 +15,7 @@ use std::sync::atomic::{self, AtomicUsize}; use std::thread; use std::time::Duration; use tokio::runtime; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use crate::app::{flatten_handle, Web3ProxyApp}; @@ -45,7 +45,7 @@ fn main() -> anyhow::Result<()> { let rpc_config: String = fs::read_to_string(cli_config.config)?; let rpc_config: AppConfig = toml::from_str(&rpc_config)?; - trace!("rpc_config: {:?}", rpc_config); + debug!(?rpc_config); // TODO: this doesn't seem to do anything proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));