improve caching

This commit is contained in:
Bryan Stitt 2022-07-22 19:30:39 +00:00
parent 80a29ceac0
commit 2833737ae7
9 changed files with 187 additions and 162 deletions

12
TODO.md
View File

@ -46,8 +46,11 @@
- we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times - we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times
- [x] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server) - [x] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server)
- this error seems to happen when we use load balanced backend rpcs like pokt and ankr - this error seems to happen when we use load balanced backend rpcs like pokt and ankr
- [ ] if we don't cache errors, then in-flight request caching is going to bottleneck - [x] RESPONSE_CACHE_CAP in bytes instead of number of entries
- [x] if we don't cache errors, then in-flight request caching is going to bottleneck
- i think now that we retry header not found and similar, caching errors should be fine - i think now that we retry header not found and similar, caching errors should be fine
- [x] RESPONSE_CACHE_CAP from config
- [x] web3_sha3 rpc command
- [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. - [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced.
- thundering herd problem if we only allow a lag of 0 blocks - thundering herd problem if we only allow a lag of 0 blocks
- we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server - we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server
@ -66,7 +69,9 @@
- [x] send getTransaction rpc requests to the private rpc tier - [x] send getTransaction rpc requests to the private rpc tier
- [x] I'm hitting infura rate limits very quickly. I feel like that means something is very inefficient - [x] I'm hitting infura rate limits very quickly. I feel like that means something is very inefficient
- whenever blocks were slow, we started checking as fast as possible - whenever blocks were slow, we started checking as fast as possible
- [ ] eth_getBlockByNumber and similar calls served from the block map - [cancelled] eth_getBlockByNumber and similar calls served from the block map
- will need all Block<TxHash> **and** Block<TransactionReceipt> in caches or fetched efficiently
- so maybe we don't want this. we can just use the general request cache for these. they will only require 1 request and it means requests won't get in the way as much on writes as new blocks arrive.
- [ ] incoming rate limiting by api key - [ ] incoming rate limiting by api key
- [ ] refactor so configs can change while running - [ ] refactor so configs can change while running
- create the app without applying any config to it - create the app without applying any config to it
@ -75,6 +80,7 @@
- [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever
- [ ] have a "backup" tier that is only used when the primary tier has no servers or is many blocks behind - [ ] have a "backup" tier that is only used when the primary tier has no servers or is many blocks behind
- we don't want the backup tier taking over with the head block if they happen to be fast at that (but overall low/expensive rps). only if the primary tier has fallen behind or gone entirely offline should we go to third parties - we don't want the backup tier taking over with the head block if they happen to be fast at that (but overall low/expensive rps). only if the primary tier has fallen behind or gone entirely offline should we go to third parties
- [ ] until this is done, an alternative is for infra to have a "failover" script that changes the configs to include a bunch of third party servers manually.
- [ ] stats when forks are resolved (and what chain they were on?) - [ ] stats when forks are resolved (and what chain they were on?)
- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection
- [ ] right now the block_map is unbounded. move this to redis and do some calculations to be sure about RAM usage - [ ] right now the block_map is unbounded. move this to redis and do some calculations to be sure about RAM usage
@ -170,3 +176,5 @@ in another repo: event subscriber
- [ ] Maybe storing pending txs on receipt in a dashmap is wrong. We want to store in a timer_heap (or similar) when we actually send. This way there's no lock contention until the race is over. - [ ] Maybe storing pending txs on receipt in a dashmap is wrong. We want to store in a timer_heap (or similar) when we actually send. This way there's no lock contention until the race is over.
- [ ] Support "safe" block height. It's planned for eth2 but we can kind of do it now but just doing head block num-3 - [ ] Support "safe" block height. It's planned for eth2 but we can kind of do it now but just doing head block num-3
- [ ] Archive check on BSC gave “archive” when it isnt. and FTM gave 90k for all servers even though they should be archive - [ ] Archive check on BSC gave “archive” when it isnt. and FTM gave 90k for all servers even though they should be archive
- [ ] cache eth_getLogs in a database?
- [ ] stats for "read amplification". how many backend requests do we send compared to frontend requests we received?

View File

@ -3,6 +3,8 @@ chain_id = 1
# in prod, do `rate_limit_redis = "redis://redis:6379/"` # in prod, do `rate_limit_redis = "redis://redis:6379/"`
#rate_limit_redis = "redis://dev-redis:6379/" #rate_limit_redis = "redis://dev-redis:6379/"
public_rate_limit_per_minute = 60_000 public_rate_limit_per_minute = 60_000
# 1GB of cache
response_cache_max_bytes = 10 ^ 9
[balanced_rpcs] [balanced_rpcs]

View File

@ -1,6 +1,7 @@
use axum::extract::ws::Message; use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap; use dashmap::DashMap;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64}; use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64};
use futures::future::Abortable; use futures::future::Abortable;
use futures::future::{join_all, AbortHandle}; use futures::future::{join_all, AbortHandle};
@ -13,6 +14,7 @@ use redis_cell_client::bb8::ErrorSink;
use redis_cell_client::{bb8, RedisCellClient, RedisConnectionManager}; use redis_cell_client::{bb8, RedisCellClient, RedisConnectionManager};
use serde_json::json; use serde_json::json;
use std::fmt; use std::fmt;
use std::mem::size_of_val;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicUsize};
@ -22,7 +24,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{info, info_span, instrument, trace, warn, Instrument}; use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
use crate::bb8_helpers; use crate::bb8_helpers;
use crate::config::AppConfig; use crate::config::AppConfig;
@ -41,9 +43,6 @@ static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_VERSION"),
); );
// TODO: put this in config? what size should we do? probably should structure this to be a size in MB
const RESPONSE_CACHE_CAP: usize = 1024;
// block hash, method, params // block hash, method, params
type CacheKey = (H256, String, Option<String>); type CacheKey = (H256, String, Option<String>);
@ -130,7 +129,7 @@ fn get_or_set_block_number(
fn get_min_block_needed( fn get_min_block_needed(
method: &str, method: &str,
params: Option<&mut serde_json::Value>, params: Option<&mut serde_json::Value>,
latest_block: U64, head_block: U64,
) -> Option<U64> { ) -> Option<U64> {
let params = params?; let params = params?;
@ -159,7 +158,7 @@ fn get_min_block_needed(
if let Some(x) = obj.get_mut("fromBlock") { if let Some(x) = obj.get_mut("fromBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block); let (modified, block_num) = block_num_to_u64(block_num, head_block);
if modified { if modified {
*x = serde_json::to_value(block_num).unwrap(); *x = serde_json::to_value(block_num).unwrap();
@ -171,7 +170,7 @@ fn get_min_block_needed(
if let Some(x) = obj.get_mut("toBlock") { if let Some(x) = obj.get_mut("toBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block); let (modified, block_num) = block_num_to_u64(block_num, head_block);
if modified { if modified {
*x = serde_json::to_value(block_num).unwrap(); *x = serde_json::to_value(block_num).unwrap();
@ -183,7 +182,7 @@ fn get_min_block_needed(
if let Some(x) = obj.get("blockHash") { if let Some(x) = obj.get("blockHash") {
// TODO: check a linkedhashmap of recent hashes // TODO: check a linkedhashmap of recent hashes
// TODO: error if fromBlock or toBlock were set // TODO: error if fromBlock or toBlock were set
unimplemented!("handle blockHash {}", x); todo!("handle blockHash {}", x);
} }
return None; return None;
@ -224,7 +223,7 @@ fn get_min_block_needed(
} }
}; };
match get_or_set_block_number(params, block_param_id, latest_block) { match get_or_set_block_number(params, block_param_id, head_block) {
Ok(block) => Some(block), Ok(block) => Some(block),
Err(err) => { Err(err) => {
// TODO: seems unlikely that we will get here // TODO: seems unlikely that we will get here
@ -253,6 +252,8 @@ pub struct Web3ProxyApp {
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>, private_rpcs: Arc<Web3Connections>,
incoming_requests: ActiveRequestsMap, incoming_requests: ActiveRequestsMap,
/// bytes available to response_cache (it will be slightly larger than this)
response_cache_max_bytes: AtomicUsize,
response_cache: ResponseLrcCache, response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working // don't drop this or the sender will stop working
// TODO: broadcast channel instead? // TODO: broadcast channel instead?
@ -308,7 +309,7 @@ impl Web3ProxyApp {
.build()?, .build()?,
); );
let redis_client_pool = match app_config.shared.rate_limit_redis { let redis_client_pool = match app_config.shared.redis_url {
Some(redis_address) => { Some(redis_address) => {
info!("Connecting to redis on {}", redis_address); info!("Connecting to redis on {}", redis_address);
@ -406,6 +407,7 @@ impl Web3ProxyApp {
balanced_rpcs, balanced_rpcs,
private_rpcs, private_rpcs,
incoming_requests: Default::default(), incoming_requests: Default::default(),
response_cache_max_bytes: AtomicUsize::new(app_config.shared.response_cache_max_bytes),
response_cache: Default::default(), response_cache: Default::default(),
head_block_receiver, head_block_receiver,
pending_tx_sender, pending_tx_sender,
@ -608,7 +610,7 @@ impl Web3ProxyApp {
// TODO: do something with subscription_join_handle? // TODO: do something with subscription_join_handle?
let response = JsonRpcForwardedResponse::from_string(subscription_id, id); let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
@ -627,19 +629,20 @@ impl Web3ProxyApp {
&self.incoming_requests &self.incoming_requests
} }
/// send the request to the approriate RPCs /// send the request or batch of requests to the approriate RPCs
/// TODO: dry this up
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
&self, &self,
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: i don't always see this in the logs. why? debug!(?request, "proxy_web3_rpc");
trace!("Received request: {:?}", request);
// even though we have timeouts on the requests to our backend providers, // even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that delays from // we need a timeout for the incoming request so that retries don't run forever
let max_time = Duration::from_secs(60); // TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration::from_secs(120);
// TODO: instrument this with a unique id
let response = match request { let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
@ -650,8 +653,7 @@ impl Web3ProxyApp {
), ),
}; };
// TODO: i don't always see this in the logs. why? debug!(?response, "Forwarding response");
trace!("Forwarding response: {:?}", response);
Ok(response) Ok(response)
} }
@ -686,7 +688,7 @@ impl Web3ProxyApp {
async fn get_cached_response( async fn get_cached_response(
&self, &self,
// TODO: accept a block hash here also? // TODO: accept a block hash here also?
min_block_needed: Option<U64>, min_block_needed: Option<&U64>,
request: &JsonRpcRequest, request: &JsonRpcRequest,
) -> anyhow::Result<( ) -> anyhow::Result<(
CacheKey, CacheKey,
@ -712,13 +714,13 @@ impl Web3ProxyApp {
if let Some(response) = self.response_cache.read().get(&key) { if let Some(response) = self.response_cache.read().get(&key) {
// TODO: emit a stat // TODO: emit a stat
trace!(?request.method, "cache hit!"); debug!(?request.method, "cache hit!");
// TODO: can we make references work? maybe put them in an Arc? // TODO: can we make references work? maybe put them in an Arc?
return Ok((key, Ok(response.to_owned()))); return Ok((key, Ok(response.to_owned())));
} else { } else {
// TODO: emit a stat // TODO: emit a stat
trace!(?request.method, "cache miss!"); debug!(?request.method, "cache miss!");
} }
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
@ -741,7 +743,8 @@ impl Web3ProxyApp {
// // TODO: add more to this span such as // // TODO: add more to this span such as
let span = info_span!("rpc_request"); let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
match request.method.as_ref() {
let partial_response: serde_json::Value = match request.method.as_ref() {
// lots of commands are blocked // lots of commands are blocked
"admin_addPeer" "admin_addPeer"
| "admin_datadir" | "admin_datadir"
@ -807,7 +810,7 @@ impl Web3ProxyApp {
| "shh_uninstallFilter" | "shh_uninstallFilter"
| "shh_version" => { | "shh_version" => {
// TODO: proper error code // TODO: proper error code
Err(anyhow::anyhow!("unsupported")) return Err(anyhow::anyhow!("unsupported"));
} }
// TODO: implement these commands // TODO: implement these commands
"eth_getFilterChanges" "eth_getFilterChanges"
@ -815,66 +818,39 @@ impl Web3ProxyApp {
| "eth_newBlockFilter" | "eth_newBlockFilter"
| "eth_newFilter" | "eth_newFilter"
| "eth_newPendingTransactionFilter" | "eth_newPendingTransactionFilter"
| "eth_uninstallFilter" => Err(anyhow::anyhow!("not yet implemented")), | "eth_uninstallFilter" => return Err(anyhow::anyhow!("not yet implemented")),
// some commands can use local data or caches // some commands can use local data or caches
"eth_accounts" => { "eth_accounts" => serde_json::Value::Array(vec![]),
let partial_response = serde_json::Value::Array(vec![]);
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"eth_blockNumber" => { "eth_blockNumber" => {
let head_block_number = self.balanced_rpcs.get_head_block_num(); let head_block_number = self.balanced_rpcs.get_head_block_num();
if head_block_number == 0 { if head_block_number.as_u64() == 0 {
return Err(anyhow::anyhow!("no servers synced")); return Err(anyhow::anyhow!("no servers synced"));
} }
let response = JsonRpcForwardedResponse::from_number(head_block_number, request.id); json!(head_block_number)
Ok(response)
} }
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => { "eth_coinbase" => {
// no need for serving coinbase. we could return a per-user payment address here, but then we might leak that to dapps // no need for serving coinbase
let partial_response = json!(Address::zero()); // we could return a per-user payment address here, but then we might leak that to dapps
json!(Address::zero())
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
// TODO: eth_estimateGas using anvil? // TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future // TODO: eth_gasPrice that does awesome magic to predict the future
// TODO: eth_getBlockByHash from caches
"eth_getBlockByHash" => {
unimplemented!("wip")
}
// TODO: eth_getBlockByNumber from caches
// TODO: eth_getBlockTransactionCountByHash from caches
// TODO: eth_getBlockTransactionCountByNumber from caches
// TODO: eth_getUncleCountByBlockHash from caches
// TODO: eth_getUncleCountByBlockNumber from caches
"eth_hashrate" => { "eth_hashrate" => {
let partial_response = json!("0x0"); json!(U64::zero())
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
"eth_mining" => { "eth_mining" => {
let partial_response = json!(false); json!(false)
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
// TODO: eth_sendBundle (flashbots command) // TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once // broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => match &request.params { "eth_sendRawTransaction" => match &request.params {
Some(serde_json::Value::Array(params)) => { Some(serde_json::Value::Array(params)) => {
// parsing params like this is gross. make struct and use serde to do all these checks and error handling
if params.len() != 1 || !params[0].is_string() { if params.len() != 1 || !params[0].is_string() {
return Err(anyhow::anyhow!("invalid request")); return Err(anyhow::anyhow!("invalid request"));
} }
@ -882,59 +858,56 @@ impl Web3ProxyApp {
let raw_tx = Bytes::from_str(params[0].as_str().unwrap())?; let raw_tx = Bytes::from_str(params[0].as_str().unwrap())?;
if check_firewall_raw(&raw_tx).await? { if check_firewall_raw(&raw_tx).await? {
self.private_rpcs return self
.private_rpcs
.try_send_all_upstream_servers(request, None) .try_send_all_upstream_servers(request, None)
.instrument(span) .instrument(span)
.await .await;
} else { } else {
Err(anyhow::anyhow!("transaction blocked by firewall")) return Err(anyhow::anyhow!("transaction blocked by firewall"));
} }
} }
_ => Err(anyhow::anyhow!("invalid request")), _ => return Err(anyhow::anyhow!("invalid request")),
}, },
"eth_syncing" => { "eth_syncing" => {
// TODO: return a real response if all backends are syncing or if no servers in sync // TODO: return a real response if all backends are syncing or if no servers in sync
let partial_response = json!(false); json!(false)
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
"net_listening" => { "net_listening" => {
// TODO: only if there are some backends on balanced_rpcs? // TODO: only if there are some backends on balanced_rpcs?
let partial_response = json!(true); json!(true)
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
"net_peerCount" => { "net_peerCount" => self.balanced_rpcs.num_synced_rpcs().into(),
let response = JsonRpcForwardedResponse::from_number( "web3_clientVersion" => serde_json::Value::String(APP_USER_AGENT.to_string()),
self.balanced_rpcs.num_synced_rpcs(), "web3_sha3" => {
request.id, // returns Keccak-256 (not the standardized SHA3-256) of the given data.
); match &request.params {
Some(serde_json::Value::Array(params)) => {
if params.len() != 1 || !params[0].is_string() {
return Err(anyhow::anyhow!("invalid request"));
}
Ok(response) let param = Bytes::from_str(params[0].as_str().unwrap())?;
let hash = H256::from(keccak256(param));
json!(hash)
}
_ => return Err(anyhow::anyhow!("invalid request")),
}
} }
"web3_clientVersion" => {
// TODO: return a real response if all backends are syncing or if no servers in sync
let partial_response = serde_json::Value::String(APP_USER_AGENT.to_string());
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
// TODO: web3_sha3? // TODO: web3_sha3?
// anything else gets sent to backend rpcs and cached
method => { method => {
// everything else is relayed to a backend
// this is not a private transaction
let head_block_number = self.balanced_rpcs.get_head_block_num(); let head_block_number = self.balanced_rpcs.get_head_block_num();
// we do this check before checking caches because it might modify the request params // we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different // TODO: add a stat for archive vs full since they should probably cost different
let min_block_needed = let min_block_needed =
get_min_block_needed(method, request.params.as_mut(), head_block_number.into()); get_min_block_needed(method, request.params.as_mut(), head_block_number);
let min_block_needed = min_block_needed.as_ref();
trace!(?min_block_needed, ?method); trace!(?min_block_needed, ?method);
@ -1007,24 +980,43 @@ impl Web3ProxyApp {
} }
}; };
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
{ {
let mut response_cache = response_cache.write(); let mut response_cache = response_cache.write();
// TODO: cache the warp::reply to save us serializing every time? let response_cache_max_bytes = self
response_cache.insert(cache_key.clone(), response.clone()); .response_cache_max_bytes
.load(atomic::Ordering::Acquire);
// TODO: instead of checking length, check size in bytes // TODO: this might be too naive. not sure how much overhead the object has
if response_cache.len() >= RESPONSE_CACHE_CAP { let new_size = size_of_val(&cache_key) + size_of_val(&response);
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front(); // no item is allowed to take more than 1% of the cache
// TODO: get this from config?
if new_size < response_cache_max_bytes / 100 {
// TODO: this probably has wildly variable timings
while size_of_val(&response_cache) + new_size >= response_cache_max_bytes {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
response_cache.insert(cache_key.clone(), response.clone());
} else {
// TODO: emit a stat instead?
warn!(?new_size, "value too large for caching");
} }
} }
let _ = self.incoming_requests.remove(&cache_key); let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false); let _ = incoming_tx.send(false);
Ok(response) return Ok(response);
} }
} };
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
} }
} }

View File

@ -38,10 +38,21 @@ pub struct AppConfig {
pub struct RpcSharedConfig { pub struct RpcSharedConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64, pub chain_id: u64,
pub rate_limit_redis: Option<String>, pub redis_url: Option<String>,
// TODO: serde default for development? #[serde(default = "default_public_rate_limit_per_minute")]
// TODO: allow no limit?
pub public_rate_limit_per_minute: u32, pub public_rate_limit_per_minute: u32,
#[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize,
}
fn default_public_rate_limit_per_minute() -> u32 {
0
}
fn default_response_cache_max_bytes() -> usize {
// TODO: default to some percentage of the system?
// 100 megabytes
10_usize.pow(8)
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

View File

@ -14,7 +14,7 @@ use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{error, info, instrument, trace, warn}; use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc; use crate::config::BlockAndRpc;
@ -42,14 +42,13 @@ impl Web3Provider {
.interval(Duration::from_secs(13)) .interval(Duration::from_secs(13))
.into() .into()
} else if url_str.starts_with("ws") { } else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect let provider = ethers::providers::Ws::connect(url_str)
let provider = ethers::providers::Ws::connect(url_str).await?; .instrument(info_span!("Web3Provider", url_str = url_str))
.await?;
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters // TODO: i don't think this interval matters
ethers::providers::Provider::new(provider) ethers::providers::Provider::new(provider).into()
.interval(Duration::from_secs(1))
.into()
} else { } else {
return Err(anyhow::anyhow!("only http and ws servers are supported")); return Err(anyhow::anyhow!("only http and ws servers are supported"));
}; };
@ -272,7 +271,7 @@ impl Web3Connection {
self.block_data_limit.load(atomic::Ordering::Acquire).into() self.block_data_limit.load(atomic::Ordering::Acquire).into()
} }
pub fn has_block_data(&self, needed_block_num: U64) -> bool { pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let block_data_limit: U64 = self.get_block_data_limit(); let block_data_limit: U64 = self.get_block_data_limit();
let newest_block_num = self.head_block.read().1; let newest_block_num = self.head_block.read().1;
@ -281,7 +280,7 @@ impl Web3Connection {
.saturating_sub(block_data_limit) .saturating_sub(block_data_limit)
.max(U64::one()); .max(U64::one());
needed_block_num >= oldest_block_num && needed_block_num <= newest_block_num needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@ -539,7 +538,7 @@ impl Web3Connection {
match self.try_request_handle().await { match self.try_request_handle().await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
// TODO: check the filter // TODO: check the filter
unimplemented!("actually send a request"); todo!("actually send a request");
} }
Err(e) => { Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e); warn!("Failed getting latest block from {}: {:?}", self, e);
@ -622,6 +621,7 @@ impl Web3Connection {
impl Hash for Web3Connection { impl Hash for Web3Connection {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
// TODO: this is wrong. we might have two connections to the same provider
self.url.hash(state); self.url.hash(state);
} }
} }

View File

@ -56,8 +56,8 @@ impl SyncedConnections {
&self.head_block_hash &self.head_block_hash
} }
pub fn get_head_block_num(&self) -> u64 { pub fn get_head_block_num(&self) -> U64 {
self.head_block_num self.head_block_num.into()
} }
} }
@ -94,11 +94,11 @@ impl BlockChain {
self.block_map.entry(hash).or_insert(block); self.block_map.entry(hash).or_insert(block);
} }
pub fn get_block(&self, num: &U64) -> Option<Arc<Block<TxHash>>> { pub fn get_cannonical_block(&self, num: &U64) -> Option<Arc<Block<TxHash>>> {
self.chain_map.get(num).map(|x| x.clone()) self.chain_map.get(num).map(|x| x.clone())
} }
pub fn get_block_from_hash(&self, hash: &H256) -> Option<Arc<Block<TxHash>>> { pub fn get_block(&self, hash: &H256) -> Option<Arc<Block<TxHash>>> {
self.block_map.get(hash).map(|x| x.clone()) self.block_map.get(hash).map(|x| x.clone())
} }
} }
@ -304,6 +304,7 @@ impl Web3Connections {
} }
} }
/// dedupe transactions and send them to any listening clients
async fn funnel_transaction( async fn funnel_transaction(
self: Arc<Self>, self: Arc<Self>,
rpc: Arc<Web3Connection>, rpc: Arc<Web3Connection>,
@ -354,7 +355,9 @@ impl Web3Connections {
Ok(()) Ok(())
} }
/// subscribe to all the backend rpcs /// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe( async fn subscribe(
self: Arc<Self>, self: Arc<Self>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
@ -408,7 +411,7 @@ impl Web3Connections {
if futures.is_empty() { if futures.is_empty() {
// no transaction or block subscriptions. // no transaction or block subscriptions.
unimplemented!("every second, check that the provider is still connected"); todo!("every second, check that the provider is still connected");
} }
if let Err(e) = try_join_all(futures).await { if let Err(e) = try_join_all(futures).await {
@ -421,14 +424,44 @@ impl Web3Connections {
Ok(()) Ok(())
} }
pub async fn get_block(&self, num: U64) -> anyhow::Result<Arc<Block<TxHash>>> { pub async fn get_block(&self, hash: &H256) -> anyhow::Result<Arc<Block<TxHash>>> {
if let Some(block) = self.chain.get_block(&num) { // first, try to get the hash from our cache
if let Some(block) = self.chain.get_block(hash) {
return Ok(block); return Ok(block);
} }
let head_block_num = self.get_head_block_num(); // block not in cache. we need to ask an rpc for it
if num.as_u64() > head_block_num { // TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
let response = self.try_send_best_upstream_server(request, None).await?;
let block = response.result.unwrap();
let block: Block<TxHash> = serde_json::from_str(block.get())?;
let block = Arc::new(block);
self.chain.add_block(block.clone(), false);
Ok(block)
}
/// Get the heaviest chain's block from cache or backend rpc
pub async fn get_cannonical_block(&self, num: &U64) -> anyhow::Result<Arc<Block<TxHash>>> {
// first, try to get the hash from our cache
if let Some(block) = self.chain.get_cannonical_block(num) {
return Ok(block);
}
// block not in cache. we need to ask an rpc for it
// but before we do any queries, be sure the requested block num exists
let head_block_num = self.get_head_block_num();
if num > &head_block_num {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested", "Head block is #{}, but #{} was requested",
head_block_num, head_block_num,
@ -437,6 +470,7 @@ impl Web3Connections {
} }
// TODO: helper for method+params => JsonRpcRequest // TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
let request = let request =
json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
@ -457,17 +491,16 @@ impl Web3Connections {
Ok(block) Ok(block)
} }
pub async fn get_block_hash(&self, num: U64) -> anyhow::Result<H256> { /// Convenience method to get the cannonical block at a given block height.
// first, try to get the hash from our cache pub async fn get_block_hash(&self, num: &U64) -> anyhow::Result<H256> {
// TODO: move this cache to redis? let block = self.get_cannonical_block(num).await?;
let block = self.get_block(num).await?;
let hash = block.hash.unwrap(); let hash = block.hash.unwrap();
Ok(hash) Ok(hash)
} }
pub fn get_head_block(&self) -> (u64, H256) { pub fn get_head_block(&self) -> (U64, H256) {
let synced_connections = self.synced_connections.load(); let synced_connections = self.synced_connections.load();
let num = synced_connections.get_head_block_num(); let num = synced_connections.get_head_block_num();
@ -480,7 +513,7 @@ impl Web3Connections {
*self.synced_connections.load().get_head_block_hash() *self.synced_connections.load().get_head_block_hash()
} }
pub fn get_head_block_num(&self) -> u64 { pub fn get_head_block_num(&self) -> U64 {
self.synced_connections.load().get_head_block_num() self.synced_connections.load().get_head_block_num()
} }
@ -488,7 +521,7 @@ impl Web3Connections {
if self.synced_connections.load().inner.is_empty() { if self.synced_connections.load().inner.is_empty() {
return false; return false;
} }
self.get_head_block_num() > 0 self.get_head_block_num() > U64::zero()
} }
pub fn num_synced_rpcs(&self) -> usize { pub fn num_synced_rpcs(&self) -> usize {
@ -756,7 +789,7 @@ impl Web3Connections {
pub async fn next_upstream_server( pub async fn next_upstream_server(
&self, &self,
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
min_block_needed: Option<U64>, min_block_needed: Option<&U64>,
) -> Result<ActiveRequestHandle, Option<Duration>> { ) -> Result<ActiveRequestHandle, Option<Duration>> {
let mut earliest_retry_after = None; let mut earliest_retry_after = None;
@ -833,7 +866,7 @@ impl Web3Connections {
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
pub async fn get_upstream_servers( pub async fn get_upstream_servers(
&self, &self,
min_block_needed: Option<U64>, min_block_needed: Option<&U64>,
) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> { ) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> {
let mut earliest_retry_after = None; let mut earliest_retry_after = None;
// TODO: with capacity? // TODO: with capacity?
@ -868,7 +901,7 @@ impl Web3Connections {
pub async fn try_send_best_upstream_server( pub async fn try_send_best_upstream_server(
&self, &self,
request: JsonRpcRequest, request: JsonRpcRequest,
min_block_needed: Option<U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![]; let mut skip_rpcs = vec![];
@ -964,7 +997,7 @@ impl Web3Connections {
pub async fn try_send_all_upstream_servers( pub async fn try_send_all_upstream_servers(
&self, &self,
request: JsonRpcRequest, request: JsonRpcRequest,
min_block_needed: Option<U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self.get_upstream_servers(min_block_needed).await { match self.get_upstream_servers(min_block_needed).await {

View File

@ -9,7 +9,7 @@ use futures::{
stream::{SplitSink, SplitStream, StreamExt}, stream::{SplitSink, SplitStream, StreamExt},
}; };
use hashbrown::HashMap; use hashbrown::HashMap;
use serde_json::value::RawValue; use serde_json::{json, value::RawValue};
use std::sync::Arc; use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, info, trace}; use tracing::{error, info, trace};
@ -78,10 +78,8 @@ async fn handle_socket_payload(
} }
}; };
let response = JsonRpcForwardedResponse::from_string( let response =
partial_response.to_string(), JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone());
id.clone(),
);
Ok(response.into()) Ok(response.into())
} }

View File

@ -4,12 +4,12 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::Serialize; use serde::Serialize;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use tracing::trace;
#[derive(Clone, serde::Deserialize)] #[derive(Clone, serde::Deserialize)]
pub struct JsonRpcRequest { pub struct JsonRpcRequest {
// TODO: skip jsonrpc entireley? // TODO: skip jsonrpc entireley?
// pub jsonrpc: Box<RawValue>, // pub jsonrpc: Box<RawValue>,
/// id could be a stricter type, but many rpcs do things against the spec
pub id: Box<RawValue>, pub id: Box<RawValue>,
pub method: String, pub method: String,
pub params: Option<serde_json::Value>, pub params: Option<serde_json::Value>,
@ -180,16 +180,6 @@ impl JsonRpcForwardedResponse {
} }
} }
pub fn from_number<T: num::Integer + std::fmt::LowerHex>(
partial_response: T,
id: Box<RawValue>,
) -> Self {
// TODO: proper escaping. this feels wrong. probably refactor to not need this at all
let partial_response = format!("0x{:x}", partial_response);
Self::from_string(partial_response, id)
}
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self { pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse { JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(), jsonrpc: "2.0".to_string(),
@ -200,15 +190,6 @@ impl JsonRpcForwardedResponse {
} }
} }
pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self {
trace!("partial_response: {}", partial_response);
// TODO: anyhow result on this
// TODO: proper escaping. this feels wrong. probably refactor to not need this at all
let partial_response = RawValue::from_string(format!(r#""{}""#, partial_response)).unwrap();
Self::from_response(partial_response, id)
}
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self { pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response = let partial_response =
serde_json::to_string(&partial_response).expect("this should always work"); serde_json::to_string(&partial_response).expect("this should always work");

View File

@ -15,7 +15,7 @@ use std::sync::atomic::{self, AtomicUsize};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::runtime; use tokio::runtime;
use tracing::{debug, info, trace}; use tracing::{debug, info};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use crate::app::{flatten_handle, Web3ProxyApp}; use crate::app::{flatten_handle, Web3ProxyApp};
@ -45,7 +45,7 @@ fn main() -> anyhow::Result<()> {
let rpc_config: String = fs::read_to_string(cli_config.config)?; let rpc_config: String = fs::read_to_string(cli_config.config)?;
let rpc_config: AppConfig = toml::from_str(&rpc_config)?; let rpc_config: AppConfig = toml::from_str(&rpc_config)?;
trace!("rpc_config: {:?}", rpc_config); debug!(?rpc_config);
// TODO: this doesn't seem to do anything // TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id)); proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));