better archive split

This commit is contained in:
Bryan Stitt 2022-07-16 04:13:02 +00:00
parent 60e1b05965
commit b87540d99d
5 changed files with 208 additions and 49 deletions

9
Cargo.lock generated

@ -180,9 +180,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e"
checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648"
dependencies = [
"async-trait",
"axum-core",
@ -3835,10 +3835,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.19.2"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439"
checksum = "57aec3cfa4c296db7255446efb4928a6be304b431a806216105542a67b6ca82e"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",

18
TODO.md

@ -36,11 +36,10 @@
- [x] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more (might already be fixed)
- [x] incoming rate limiting (by ip)
- [x] connection pool for redis
- [ ] automatically route to archive server when necessary
- [x] automatically route to archive server when necessary
- originally, no processing was done to params; they were just serde_json::RawValue. this is probably fastest, but we need to look for "latest" and count elements, so we have to use serde_json::Value
- when getting the next server, filtering on "archive" isn't going to work well.
- [ ] we need a list of "Arc<Web3Connection>" just of archive servers. we can route to any of them even if they are behind by many blocks
- [ ] if the requested block is ahead of the best block, return without querying any backend servers
- when getting the next server, filtering on "archive" isn't going to work well. need to check inner instead
- [ ] if the requested block is ahead of the best block, return without querying any backend servers
- [ ] handle log subscriptions
- [ ] basic request method stats
- [x] http servers should check block at the very start
@ -61,10 +60,12 @@
- [ ] improved logging with useful instrumentation
- [ ] add a subscription that returns the head block number and hash but nothing else
- [ ] if we don't cache errors, then in-flight request caching is going to bottleneck
- i think now that we retry header not found and similar, caching errors should be fine
- [ ] improve caching
- [ ] if the eth_call (or similar) params include a block, we can cache for longer
- [ ] if the call is something simple like "symbol" or "decimals", cache that too
- [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls
- [ ] if the eth_call (or similar) params include a block, we can cache for that. if its archive-age, itshould be fine to cache by number instead of hash
- [ ] add a "recent hashes" to synced connections with 64 parent blocks (maybe 128)
- [ ] if the call is something simple like "symbol" or "decimals", cache that too. though i think this could bite us
- [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls
- [x] eth_blockNumber without a backend request
- [ ] if a rpc fails to connect at start, retry later instead of skipping it forever
- [ ] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server)
@ -135,3 +136,6 @@ in another repo: event subscriber
- [ ] flashbots protect fast mode or not? probably fast matches most user's needs, but no reverts is nice.
- [ ] https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#authentication maybe have per-user keys. or pass their header on if its set
- [ ] if no redis set, but public rate limits are set, exit with an error
- [ ] i saw "WebSocket connection closed unexpectedly" but no auto reconnect. need better logs on these
- [ ] if archive servers are added to the rotation while they are still syncing, they might get requests too soon. keep archive servers out of the configs until they are done syncing. full nodes should be fine to add to the configs even while syncing, though its a wasted connection
- [ ] when under load, i'm seeing "http interval lagging!". sometimes it happens when not loaded.

@ -13,7 +13,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"]
anyhow = { version = "1.0.58", features = ["backtrace"] }
arc-swap = "1.5.0"
argh = "0.1.8"
axum = { version = "0.5.12", features = ["serde_json", "tokio-tungstenite", "ws"] }
axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] }
axum-client-ip = "0.2.0"
counter = "0.5.5"
dashmap = "5.3.4"
@ -34,10 +34,10 @@ proctitle = "0.1.1"
regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0"
siwe = "0.3"
siwe = "0.3.0"
serde = { version = "1.0.139", features = [] }
serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.19.2", features = ["full", "tracing"] }
tokio = { version = "1.20.0", features = ["full", "tracing"] }
toml = "0.5.9"
tracing = "0.1.35"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use

@ -1,7 +1,7 @@
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::{Address, Bytes, Transaction};
use ethers::prelude::{Address, BlockNumber, Bytes, Transaction};
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -79,34 +79,179 @@ pub async fn flatten_handles<T>(
Ok(())
}
fn is_archive_needed(method: &str, params: Option<&mut serde_json::Value>) -> bool {
match method {
"eth_call" => unimplemented!(),
"eth_getBalance" => unimplemented!(),
"eth_getCode" => unimplemented!(),
"eth_getLogs" => unimplemented!(),
"eth_getStorageAt" => unimplemented!(),
"eth_getTransactionByBlockHashAndIndex" => unimplemented!(),
"eth_getTransactionByBlockNumberAndIndex" => unimplemented!(),
"eth_getTransactionCount" => unimplemented!(),
"eth_getTransactionReceipt" => unimplemented!(),
"eth_getUncleByBlockHashAndIndex" => unimplemented!(),
"eth_getUncleByBlockNumberAndIndex" => unimplemented!(),
_ => {
return false;
fn value_to_block_num(x: &serde_json::Value) -> anyhow::Result<BlockNumber> {
let block_num = if x.is_string() {
BlockNumber::from_str(x.as_str().unwrap()).unwrap()
} else if x.is_number() {
BlockNumber::from(x.as_u64().unwrap())
} else {
return Err(anyhow::anyhow!("unexpected BlockNumber"));
};
Ok(block_num)
}
fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) {
match block_num {
BlockNumber::Earliest => (false, 0),
BlockNumber::Latest => {
// change "latest" to a number
(true, latest_block)
}
BlockNumber::Number(x) => (false, x.as_u64()),
// TODO: think more about how to handle Pending
BlockNumber::Pending => (false, latest_block),
}
}
fn get_or_set_block_number(
params: &mut serde_json::Value,
block_param_id: usize,
latest_block: u64,
) -> anyhow::Result<u64> {
match params.as_array_mut() {
None => Err(anyhow::anyhow!("params not an array")),
Some(params) => match params.get_mut(block_param_id) {
None => {
if params.len() != block_param_id - 1 {
return Err(anyhow::anyhow!("unexpected params length"));
}
// add the latest block number to the end of the params
params.push(latest_block.into());
Ok(latest_block)
}
Some(x) => {
// convert the json value to a BlockNumber
let block_num = value_to_block_num(x)?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block);
// if we changed "latest" to a number, update the params to match
if modified {
*x = block_num.into();
}
Ok(block_num)
}
},
}
}
fn is_archive_needed(
method: &str,
params: Option<&mut serde_json::Value>,
latest_block: u64,
) -> bool {
if params.is_none() {
return false;
}
// TODO: find the given block number in params
// if the query's block is recent, return false
// geth: 128 blocks (or only 64 if youre using fast/snap sync)
// TODO: apparently erigon keeps data for 90k blocks. so true/false isn't going to be enough. need per-server archive limits
// TODO: maybe give 1 block of headroom?
let last_full_block = latest_block.saturating_sub(64);
// TODO: if its "latest" (or not given), modify params to have the latest block. return false
let params = params.unwrap();
// TODO: if its "pending", do something special? return false
// TODO: double check these. i think some of the getBlock stuff will never need archive
let block_param_id = match method {
"eth_call" => 1,
"eth_estimateGas" => 1,
"eth_getBalance" => 1,
"eth_getBlockByHash" => {
return false;
}
"eth_getBlockByNumber" => 0,
"eth_getBlockTransactionCountByHash" => {
// TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes
return false;
}
"eth_getBlockTransactionCountByNumber" => 0,
"eth_getCode" => 1,
"eth_getLogs" => {
let obj = params[0].as_object_mut().unwrap();
// TODO: we probably need a list of recent hashes/heights. if specified block is recent, return false
if let Some(x) = obj.get_mut("fromBlock") {
if let Ok(block_num) = value_to_block_num(x) {
let (modified, block_num) = block_num_to_u64(block_num, latest_block);
// this command needs an archive server
true
if modified {
*x = block_num.into();
}
if block_num < last_full_block {
return true;
}
}
}
if let Some(x) = obj.get_mut("toBlock") {
if let Ok(block_num) = value_to_block_num(x) {
let (modified, block_num) = block_num_to_u64(block_num, latest_block);
if modified {
*x = block_num.into();
}
if block_num < last_full_block {
return true;
}
}
}
if let Some(x) = obj.get("blockHash") {
// TODO: check a linkedhashmap of recent hashes
// TODO: error if fromBlock or toBlock were set
}
return false;
}
"eth_getStorageAt" => 2,
"eth_getTransactionByHash" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return false;
}
"eth_getTransactionByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
return false;
}
"eth_getTransactionByBlockNumberAndIndex" => 0,
"eth_getTransactionCount" => 1,
"eth_getTransactionReceipt" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return false;
}
"eth_getUncleByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
return false;
}
"eth_getUncleByBlockNumberAndIndex" => 0,
"eth_getUncleCountByBlockHash" => {
// TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive
return false;
}
"eth_getUncleCountByBlockNumber" => 0,
_ => {
// some other command that doesn't take block numbers as an argument
return false;
}
};
if let Ok(block) = get_or_set_block_number(params, block_param_id, latest_block) {
block < last_full_block
} else {
// TODO: seems unlikely that we will get here. probably should log this error
// if this is incorrect, it should retry on an archive server
false
}
}
// TODO: think more about TxState. d
@ -566,7 +711,7 @@ impl Web3ProxyApp {
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
// TODO: Some requests should skip caching on the head_block_hash
// TODO: Some requests should skip caching on an older block hash. take number as an argument. if its older than 64, use it as the key
let head_block_hash = self.balanced_rpcs.get_head_block_hash();
// TODO: better key? benchmark this
@ -792,8 +937,11 @@ impl Web3ProxyApp {
// everything else is relayed to a backend
// this is not a private transaction
let head_block_number = self.balanced_rpcs.get_head_block_num();
// we do this check before checking caches because it might modify the request params
let archive_needed = is_archive_needed(method, request.params.as_mut());
let archive_needed =
is_archive_needed(method, request.params.as_mut(), head_block_number);
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {

@ -633,30 +633,25 @@ impl Web3Connections {
return Err(None);
}
let sort_cache: HashMap<Arc<Web3Connection>, (f32, u32)> = synced_rpcs
let sort_cache: HashMap<_, _> = synced_rpcs
.iter()
.map(|rpc| {
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
let is_archive = rpc.is_archive();
let utilization = active_requests as f32 / soft_limit as f32;
(rpc.clone(), (utilization, soft_limit))
(rpc.clone(), (is_archive, utilization, soft_limit))
})
.collect();
synced_rpcs.sort_unstable_by(|a, b| {
let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap();
let a_sorts = sort_cache.get(a).unwrap();
let b_sorts = sort_cache.get(b).unwrap();
// TODO: i'm comparing floats. crap
match a_utilization
.partial_cmp(b_utilization)
.unwrap_or(cmp::Ordering::Equal)
{
cmp::Ordering::Equal => a_soft_limit.cmp(b_soft_limit),
x => x,
}
a_sorts.partial_cmp(b_sorts).unwrap_or(cmp::Ordering::Equal)
});
// now that the rpcs are sorted, try to get an active request handle for one of them
@ -859,3 +854,14 @@ impl Web3Connections {
}
}
}
mod tests {
#[test]
fn test_false_before_true() {
let mut x = [true, false, true];
x.sort();
assert_eq!(x, [false, true, true])
}
}