smarter archive routing

This commit is contained in:
Bryan Stitt 2022-07-19 01:31:12 +00:00
parent dd0f7dbd5e
commit 75098d83b6
6 changed files with 263 additions and 175 deletions

16
Cargo.lock generated
View File

@ -747,9 +747,9 @@ checksum = "fb4a24b1aaf0fd0ce8b45161144d6f42cd91677fd5940fd431183eb023b3a2b8"
[[package]] [[package]]
name = "counter" name = "counter"
version = "0.5.5" version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b05d7acd00b53d97b7369c4062027ff55711db0c509f5b28d6d964a2f1ff87" checksum = "48388d8711a360319610960332b6a6f9fc2b5a63bba9fd10f1b7aa50677d956f"
dependencies = [ dependencies = [
"num-traits", "num-traits",
] ]
@ -942,7 +942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"hashbrown 0.12.2", "hashbrown 0.12.3",
"lock_api", "lock_api",
"parking_lot_core 0.9.3", "parking_lot_core 0.9.3",
] ]
@ -1829,9 +1829,9 @@ checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.2" version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [ dependencies = [
"ahash", "ahash",
] ]
@ -1851,7 +1851,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086"
dependencies = [ dependencies = [
"hashbrown 0.12.2", "hashbrown 0.12.3",
] ]
[[package]] [[package]]
@ -2255,7 +2255,7 @@ name = "linkedhashmap"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"criterion", "criterion",
"hashbrown 0.12.2", "hashbrown 0.12.3",
"hashlink", "hashlink",
"linked-hash-map", "linked-hash-map",
"slab", "slab",
@ -4333,7 +4333,7 @@ dependencies = [
"fdlimit", "fdlimit",
"flume", "flume",
"futures", "futures",
"hashbrown 0.12.2", "hashbrown 0.12.3",
"linkedhashmap", "linkedhashmap",
"notify", "notify",
"num", "num",

View File

@ -9,7 +9,7 @@ inline-more = [ "hashbrown" ]
[dependencies] [dependencies]
slab = "0.4.6" slab = "0.4.6"
hashbrown = { version = "0.12.2", optional = true } hashbrown = { version = "0.12.3", optional = true }
[dev-dependencies] [dev-dependencies]
criterion = "0.3.6" criterion = "0.3.6"

View File

@ -15,14 +15,14 @@ arc-swap = "1.5.0"
argh = "0.1.8" argh = "0.1.8"
axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] } axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] }
axum-client-ip = "0.2.0" axum-client-ip = "0.2.0"
counter = "0.5.5" counter = "0.5.6"
dashmap = "5.3.4" dashmap = "5.3.4"
derive_more = "0.99.17" derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1" fdlimit = "0.2.1"
flume = "0.10.13" flume = "0.10.13"
futures = { version = "0.3.21", features = ["thread-pool"] } futures = { version = "0.3.21", features = ["thread-pool"] }
hashbrown = "0.12.2" hashbrown = "0.12.3"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
notify = "4.0.17" notify = "4.0.17"
num = "0.4.0" num = "0.4.0"

View File

@ -1,8 +1,7 @@
use axum::extract::ws::Message; use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap; use dashmap::DashMap;
use ethers::prelude::{Address, BlockNumber, Bytes, Transaction}; use ethers::prelude::{Address, Block, BlockNumber, Bytes, Transaction, TxHash, H256, U64};
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable; use futures::future::Abortable;
use futures::future::{join_all, AbortHandle}; use futures::future::{join_all, AbortHandle};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
@ -23,7 +22,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; use tracing::{info, info_span, instrument, trace, warn, Instrument};
use crate::bb8_helpers; use crate::bb8_helpers;
use crate::config::AppConfig; use crate::config::AppConfig;
@ -79,26 +78,14 @@ pub async fn flatten_handles<T>(
Ok(()) Ok(())
} }
fn value_to_block_num(x: &serde_json::Value) -> anyhow::Result<BlockNumber> { fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) {
let block_num = if x.is_string() {
BlockNumber::from_str(x.as_str().unwrap()).unwrap()
} else if x.is_number() {
BlockNumber::from(x.as_u64().unwrap())
} else {
return Err(anyhow::anyhow!("unexpected BlockNumber"));
};
Ok(block_num)
}
fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) {
match block_num { match block_num {
BlockNumber::Earliest => (false, 0), BlockNumber::Earliest => (false, U64::zero()),
BlockNumber::Latest => { BlockNumber::Latest => {
// change "latest" to a number // change "latest" to a number
(true, latest_block) (true, latest_block)
} }
BlockNumber::Number(x) => (false, x.as_u64()), BlockNumber::Number(x) => (false, x),
// TODO: think more about how to handle Pending // TODO: think more about how to handle Pending
BlockNumber::Pending => (false, latest_block), BlockNumber::Pending => (false, latest_block),
} }
@ -107,8 +94,8 @@ fn block_num_to_u64(block_num: BlockNumber, latest_block: u64) -> (bool, u64) {
fn get_or_set_block_number( fn get_or_set_block_number(
params: &mut serde_json::Value, params: &mut serde_json::Value,
block_param_id: usize, block_param_id: usize,
latest_block: u64, latest_block: U64,
) -> anyhow::Result<u64> { ) -> anyhow::Result<U64> {
match params.as_array_mut() { match params.as_array_mut() {
None => Err(anyhow::anyhow!("params not an array")), None => Err(anyhow::anyhow!("params not an array")),
Some(params) => match params.get_mut(block_param_id) { Some(params) => match params.get_mut(block_param_id) {
@ -118,19 +105,19 @@ fn get_or_set_block_number(
} }
// add the latest block number to the end of the params // add the latest block number to the end of the params
params.push(latest_block.into()); params.push(serde_json::to_value(latest_block)?);
Ok(latest_block) Ok(latest_block)
} }
Some(x) => { Some(x) => {
// convert the json value to a BlockNumber // convert the json value to a BlockNumber
let block_num = value_to_block_num(x)?; let block_num: BlockNumber = serde_json::from_value(x.clone())?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block); let (modified, block_num) = block_num_to_u64(block_num, latest_block);
// if we changed "latest" to a number, update the params to match // if we changed "latest" to a number, update the params to match
if modified { if modified {
*x = format!("0x{:x}", block_num).into(); *x = serde_json::to_value(block_num)?;
} }
Ok(block_num) Ok(block_num)
@ -139,23 +126,13 @@ fn get_or_set_block_number(
} }
} }
// TODO: change this to return the height needed instead // TODO: change this to return the height needed (OR hash if recent)
fn is_archive_needed( fn get_min_block_needed(
method: &str, method: &str,
params: Option<&mut serde_json::Value>, params: Option<&mut serde_json::Value>,
latest_block: u64, latest_block: U64,
) -> bool { ) -> Option<U64> {
if params.is_none() { let params = params?;
return false;
}
// if the query's block is recent, return false
// geth: 128 blocks (or only 64 if youre using fast/snap sync)
// TODO: apparently erigon keeps data for 90k blocks. so true/false isn't going to be enough. need per-server archive limits
// TODO: maybe give 1 block of headroom?
let last_full_block = latest_block.saturating_sub(64);
let params = params.unwrap();
// TODO: double check these. i think some of the getBlock stuff will never need archive // TODO: double check these. i think some of the getBlock stuff will never need archive
let block_param_id = match method { let block_param_id = match method {
@ -163,14 +140,14 @@ fn is_archive_needed(
"eth_estimateGas" => 1, "eth_estimateGas" => 1,
"eth_getBalance" => 1, "eth_getBalance" => 1,
"eth_getBlockByHash" => { "eth_getBlockByHash" => {
return false; return None;
} }
"eth_getBlockByNumber" => { "eth_getBlockByNumber" => {
return false; return None;
} }
"eth_getBlockTransactionCountByHash" => { "eth_getBlockTransactionCountByHash" => {
// TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes // TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes
return false; return None;
} }
"eth_getBlockTransactionCountByNumber" => 0, "eth_getBlockTransactionCountByNumber" => 0,
"eth_getCode" => 1, "eth_getCode" => 1,
@ -178,82 +155,81 @@ fn is_archive_needed(
let obj = params[0].as_object_mut().unwrap(); let obj = params[0].as_object_mut().unwrap();
if let Some(x) = obj.get_mut("fromBlock") { if let Some(x) = obj.get_mut("fromBlock") {
if let Ok(block_num) = value_to_block_num(x) { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block); let (modified, block_num) = block_num_to_u64(block_num, latest_block);
if modified { if modified {
*x = format!("0x{:x}", block_num).into(); *x = serde_json::to_value(block_num).unwrap();
} }
if block_num < last_full_block { return Some(block_num);
return true;
}
}
} }
if let Some(x) = obj.get_mut("toBlock") { if let Some(x) = obj.get_mut("toBlock") {
if let Ok(block_num) = value_to_block_num(x) { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block); let (modified, block_num) = block_num_to_u64(block_num, latest_block);
if modified { if modified {
*x = format!("0x{:x}", block_num).into(); *x = serde_json::to_value(block_num).unwrap();
} }
if block_num < last_full_block { return Some(block_num);
return true;
}
}
} }
if let Some(x) = obj.get("blockHash") { if let Some(x) = obj.get("blockHash") {
// TODO: check a linkedhashmap of recent hashes // TODO: check a linkedhashmap of recent hashes
// TODO: error if fromBlock or toBlock were set // TODO: error if fromBlock or toBlock were set
unimplemented!("handle blockHash {}", x);
} }
return false; return None;
} }
"eth_getStorageAt" => 2, "eth_getStorageAt" => 2,
"eth_getTransactionByHash" => { "eth_getTransactionByHash" => {
// TODO: not sure how best to look these up // TODO: not sure how best to look these up
// try full nodes first. retry will use archive // try full nodes first. retry will use archive
return false; return None;
} }
"eth_getTransactionByBlockHashAndIndex" => { "eth_getTransactionByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes // TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive // try full nodes first. retry will use archive
return false; return None;
} }
"eth_getTransactionByBlockNumberAndIndex" => 0, "eth_getTransactionByBlockNumberAndIndex" => 0,
"eth_getTransactionCount" => 1, "eth_getTransactionCount" => 1,
"eth_getTransactionReceipt" => { "eth_getTransactionReceipt" => {
// TODO: not sure how best to look these up // TODO: not sure how best to look these up
// try full nodes first. retry will use archive // try full nodes first. retry will use archive
return false; return None;
} }
"eth_getUncleByBlockHashAndIndex" => { "eth_getUncleByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes // TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive // try full nodes first. retry will use archive
return false; return None;
} }
"eth_getUncleByBlockNumberAndIndex" => 0, "eth_getUncleByBlockNumberAndIndex" => 0,
"eth_getUncleCountByBlockHash" => { "eth_getUncleCountByBlockHash" => {
// TODO: check a linkedhashmap of recent hashes // TODO: check a linkedhashmap of recent hashes
// try full nodes first. retry will use archive // try full nodes first. retry will use archive
return false; return None;
} }
"eth_getUncleCountByBlockNumber" => 0, "eth_getUncleCountByBlockNumber" => 0,
_ => { _ => {
// some other command that doesn't take block numbers as an argument // some other command that doesn't take block numbers as an argument
return false; return None;
} }
}; };
if let Ok(block) = get_or_set_block_number(params, block_param_id, latest_block) { match get_or_set_block_number(params, block_param_id, latest_block) {
block < last_full_block Ok(block) => Some(block),
} else { Err(err) => {
// TODO: seems unlikely that we will get here. probably should log this error // TODO: seems unlikely that we will get here
// if this is incorrect, it should retry on an archive server // if this is incorrect, it should retry on an archive server
false warn!(?err, "could not get block from params");
None
}
} }
} }
@ -704,41 +680,48 @@ impl Web3ProxyApp {
Ok(collected) Ok(collected)
} }
fn get_cached_response( async fn get_cached_response(
&self, &self,
// TODO: accept a block hash here also
min_block_needed: Option<U64>,
request: &JsonRpcRequest, request: &JsonRpcRequest,
) -> ( ) -> anyhow::Result<(
CacheKey, CacheKey,
Result<JsonRpcForwardedResponse, &ResponseLrcCache>, Result<JsonRpcForwardedResponse, &ResponseLrcCache>,
) { )> {
// TODO: inspect the request to pick the right cache // TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
// TODO: Some requests should skip caching on an older block hash. take number as an argument. if its older than 64, use it as the key let request_block_hash = if let Some(min_block_needed) = min_block_needed {
let head_block_hash = self.balanced_rpcs.get_head_block_hash(); let block_result = self.balanced_rpcs.get_block_hash(min_block_needed).await;
block_result?
} else {
self.balanced_rpcs.get_head_block_hash()
};
// TODO: better key? benchmark this // TODO: better key? benchmark this
let key = ( let key = (
head_block_hash, request_block_hash,
request.method.clone(), request.method.clone(),
request.params.clone().map(|x| x.to_string()), request.params.clone().map(|x| x.to_string()),
); );
if let Some(response) = self.response_cache.read().get(&key) { if let Some(response) = self.response_cache.read().get(&key) {
// TODO: emit a stat // TODO: emit a stat
trace!("{:?} cache hit!", request); trace!(?request.method, "cache hit!");
// TODO: can we make references work? maybe put them in an Arc? // TODO: can we make references work? maybe put them in an Arc?
return (key, Ok(response.to_owned())); return Ok((key, Ok(response.to_owned())));
} else { } else {
// TODO: emit a stat // TODO: emit a stat
trace!("{:?} cache miss!", request); trace!(?request.method, "cache miss!");
} }
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = &self.response_cache; let cache = &self.response_cache;
(key, Err(cache)) Ok((key, Err(cache)))
} }
// #[instrument(skip_all)] // #[instrument(skip_all)]
@ -755,7 +738,7 @@ impl Web3ProxyApp {
// // TODO: add more to this span such as // // TODO: add more to this span such as
let span = info_span!("rpc_request"); let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
match &request.method[..] { match request.method.as_ref() {
// lots of commands are blocked // lots of commands are blocked
"admin_addPeer" "admin_addPeer"
| "admin_datadir" | "admin_datadir"
@ -894,7 +877,7 @@ impl Web3ProxyApp {
if check_firewall_raw(&raw_tx).await? { if check_firewall_raw(&raw_tx).await? {
self.private_rpcs self.private_rpcs
.try_send_all_upstream_servers(request, false) .try_send_all_upstream_servers(request, None)
.instrument(span) .instrument(span)
.await .await
} else { } else {
@ -944,20 +927,23 @@ impl Web3ProxyApp {
// we do this check before checking caches because it might modify the request params // we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different // TODO: add a stat for archive vs full since they should probably cost different
let archive_needed = let min_block_needed =
is_archive_needed(method, request.params.as_mut(), head_block_number); get_min_block_needed(method, request.params.as_mut(), head_block_number.into());
trace!(?archive_needed, ?method); trace!(?min_block_needed, ?method);
let (cache_key, response_cache) = match self.get_cached_response(&request) { let (cache_key, cache_result) =
(cache_key, Ok(response)) => { self.get_cached_response(min_block_needed, &request).await?;
let response_cache = match cache_result {
Ok(response) => {
let _ = self.incoming_requests.remove(&cache_key); let _ = self.incoming_requests.remove(&cache_key);
// TODO: if the response is cached, should it count less against the account's costs? // TODO: if the response is cached, should it count less against the account's costs?
return Ok(response); return Ok(response);
} }
(cache_key, Err(response_cache)) => (cache_key, response_cache), Err(response_cache) => response_cache,
}; };
// check if this request is already in flight // check if this request is already in flight
@ -1004,13 +990,13 @@ impl Web3ProxyApp {
"eth_getTransactionByHash" | "eth_getTransactionReceipt" => { "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel // TODO: try_send_all serially with retries instead of parallel
self.private_rpcs self.private_rpcs
.try_send_all_upstream_servers(request, archive_needed) .try_send_all_upstream_servers(request, min_block_needed)
.await? .await?
} }
_ => { _ => {
// TODO: retries? // TODO: retries?
self.balanced_rpcs self.balanced_rpcs
.try_send_best_upstream_server(request, archive_needed) .try_send_best_upstream_server(request, min_block_needed)
.await? .await?
} }
}; };

View File

@ -1,7 +1,7 @@
///! Rate-limited communication with a web3 provider ///! Rate-limited communication with a web3 provider
use anyhow::Context; use anyhow::Context;
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash}; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all; use futures::future::try_join_all;
use futures::StreamExt; use futures::StreamExt;
use redis_cell_client::RedisCellClient; use redis_cell_client::RedisCellClient;
@ -9,7 +9,7 @@ use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use std::fmt; use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicBool, AtomicU32}; use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -81,7 +81,8 @@ pub struct Web3Connection {
hard_limit: Option<redis_cell_client::RedisCellClient>, hard_limit: Option<redis_cell_client::RedisCellClient>,
/// used for load balancing to the least loaded server /// used for load balancing to the least loaded server
soft_limit: u32, soft_limit: u32,
archive: AtomicBool, block_data_limit: AtomicU64,
head_block: parking_lot::RwLock<(H256, U64)>,
} }
impl Serialize for Web3Connection { impl Serialize for Web3Connection {
@ -107,10 +108,18 @@ impl Serialize for Web3Connection {
} }
impl fmt::Debug for Web3Connection { impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection") let mut f = f.debug_struct("Web3Connection");
.field("url", &self.url)
.field("archive", &self.is_archive()) f.field("url", &self.url);
.finish_non_exhaustive()
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
f.field("limit", &"archive");
} else {
f.field("limit", &block_data_limit);
}
f.finish_non_exhaustive()
} }
} }
@ -123,6 +132,7 @@ impl fmt::Display for Web3Connection {
impl Web3Connection { impl Web3Connection {
/// Connect to a web3 rpc /// Connect to a web3 rpc
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
// TODO: have this take a builder (which will have senders attached)
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn spawn( pub async fn spawn(
chain_id: usize, chain_id: usize,
@ -157,13 +167,14 @@ impl Web3Connection {
provider: RwLock::new(Some(Arc::new(provider))), provider: RwLock::new(Some(Arc::new(provider))),
hard_limit, hard_limit,
soft_limit, soft_limit,
archive: Default::default(), block_data_limit: Default::default(),
head_block: parking_lot::RwLock::new((H256::zero(), 0isize.into())),
}; };
let new_connection = Arc::new(new_connection); let new_connection = Arc::new(new_connection);
// check the server's chain_id here // check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: this will wait forever. do we want that? // TODO: this will wait forever. do we want that?
let found_chain_id: Result<String, _> = new_connection let found_chain_id: Result<String, _> = new_connection
@ -192,27 +203,8 @@ impl Web3Connection {
} }
} }
// we could take "archive" as a parameter, but we would want a safety check on it regardless // subscribe to new blocks and new transactions
// just query something very old and if we get an error, we don't have archive data // TODO: make transaction subscription optional (just pass None for tx_id_sender)
let archive_result: Result<Bytes, _> = new_connection
.wait_for_request_handle()
.await
.request(
"eth_getCode",
("0xdead00000000000000000000000000000000beef", "0x1"),
)
.await;
trace!(?archive_result, "{}", new_connection);
if archive_result.is_ok() {
new_connection
.archive
.store(true, atomic::Ordering::Relaxed);
}
info!(?new_connection, "success");
let handle = { let handle = {
let new_connection = new_connection.clone(); let new_connection = new_connection.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -222,11 +214,74 @@ impl Web3Connection {
}) })
}; };
// TODO: make sure the server isn't still syncing
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(100)).await;
// we could take "archive" as a parameter, but we would want a safety check on it regardless
// just query something very old and if we get an error, we don't have archive data
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
let mut head_block_num = new_connection.head_block.read().1;
// TODO: wait until head block is set outside the loop?
while head_block_num == U64::zero() {
info!(?new_connection, "no head block");
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
sleep(Duration::from_secs(1)).await;
head_block_num = new_connection.head_block.read().1;
}
let maybe_archive_block = head_block_num
.saturating_sub(block_data_limit.into())
.max(U64::one());
let archive_result: Result<Bytes, _> = new_connection
.wait_for_request_handle()
.await
.request(
"eth_getCode",
(
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
),
)
.await;
trace!(?archive_result, "{}", new_connection);
if archive_result.is_ok() {
new_connection
.block_data_limit
.store(block_data_limit, atomic::Ordering::Release);
break;
}
}
info!(?new_connection, "success");
Ok((new_connection, handle)) Ok((new_connection, handle))
} }
pub fn is_archive(&self) -> bool { /// TODO: this might be too simple. different nodes can prune differently
self.archive.load(atomic::Ordering::Relaxed) pub fn get_block_data_limit(&self) -> U64 {
self.block_data_limit.load(atomic::Ordering::Acquire).into()
}
pub fn has_block_data(&self, needed_block_num: U64) -> bool {
let block_data_limit: U64 = self.get_block_data_limit();
let newest_block_num = self.head_block.read().1;
let oldest_block_num = newest_block_num
.saturating_sub(block_data_limit)
.max(U64::one());
needed_block_num >= oldest_block_num && needed_block_num <= newest_block_num
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@ -237,7 +292,10 @@ impl Web3Connection {
// websocket doesn't need the http client // websocket doesn't need the http client
let http_client = None; let http_client = None;
info!(?self, "reconnecting");
// since this lock is held open over an await, we use tokio's locking // since this lock is held open over an await, we use tokio's locking
// TODO: timeout on this lock. if its slow, something is wrong
let mut provider = self.provider.write().await; let mut provider = self.provider.write().await;
*provider = None; *provider = None;
@ -281,7 +339,15 @@ impl Web3Connection {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match block { match block {
Ok(block) => { Ok(block) => {
// TODO: i'm pretty sure we don't need send_async, but double check {
let hash = block.hash.unwrap();
let num = block.number.unwrap();
let mut head_block = self.head_block.write();
*head_block = (hash, num);
}
block_sender block_sender
.send_async((block, self.clone())) .send_async((block, self.clone()))
.await .await
@ -376,19 +442,9 @@ impl Web3Connection {
let mut http_interval_receiver = http_interval_receiver.unwrap(); let mut http_interval_receiver = http_interval_receiver.unwrap();
let mut last_hash = Default::default(); let mut last_hash = H256::zero();
loop { loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
// TODO: if recverror is not Lagged, exit?
// querying the block was delayed. this can happen if tokio was busy.
warn!(?err, ?self, "http interval lagging!");
}
trace!(?self, "ok http interval");
match self.try_request_handle().await { match self.try_request_handle().await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
@ -413,6 +469,22 @@ impl Web3Connection {
warn!(?err, "Rate limited on latest block from {}", self); warn!(?err, "Rate limited on latest block from {}", self);
} }
} }
// wait for the interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed. this can happen if tokio is very busy.
warn!(?err, ?self, "http interval lagging by {}!", lagged);
}
}
}
trace!(?self, "ok http interval");
} }
} }
Web3Provider::Ws(provider) => { Web3Provider::Ws(provider) => {

View File

@ -4,13 +4,14 @@ use arc_swap::ArcSwap;
use counter::Counter; use counter::Counter;
use dashmap::DashMap; use dashmap::DashMap;
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256}; use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64};
use futures::future::try_join_all; use futures::future::try_join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::cmp; use std::cmp;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
@ -357,6 +358,26 @@ impl Web3Connections {
self.synced_connections.load().get_head_block_num() self.synced_connections.load().get_head_block_num()
} }
pub async fn get_block_hash(&self, num: U64) -> anyhow::Result<H256> {
// TODO: this definitely needs caching
warn!("this needs to be much more efficient");
// TODO: helper for this
let request =
json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry
let response = self
.try_send_best_upstream_server(request, Some(num))
.await?;
let block = response.result.unwrap().to_string();
let block: Block<TxHash> = serde_json::from_str(&block)?;
Ok(block.hash.unwrap())
}
pub fn get_head_block_hash(&self) -> H256 { pub fn get_head_block_hash(&self) -> H256 {
*self.synced_connections.load().get_head_block_hash() *self.synced_connections.load().get_head_block_hash()
} }
@ -608,17 +629,19 @@ impl Web3Connections {
pub async fn next_upstream_server( pub async fn next_upstream_server(
&self, &self,
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
archive_needed: bool, min_block_needed: Option<U64>,
) -> Result<ActiveRequestHandle, Option<Duration>> { ) -> Result<ActiveRequestHandle, Option<Duration>> {
let mut earliest_retry_after = None; let mut earliest_retry_after = None;
// filter the synced rpcs // filter the synced rpcs
let mut synced_rpcs: Vec<Arc<Web3Connection>> = if archive_needed { // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
let mut synced_rpcs: Vec<Arc<Web3Connection>> =
if let Some(min_block_needed) = min_block_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block // TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then // TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
self.inner self.inner
.iter() .iter()
.filter(|x| x.is_archive()) .filter(|x| x.has_block_data(min_block_needed))
.filter(|x| !skip.contains(x)) .filter(|x| !skip.contains(x))
.cloned() .cloned()
.collect() .collect()
@ -639,13 +662,15 @@ impl Web3Connections {
let sort_cache: HashMap<_, _> = synced_rpcs let sort_cache: HashMap<_, _> = synced_rpcs
.iter() .iter()
.map(|rpc| { .map(|rpc| {
// TODO: get active requests and the soft limit out of redis?
let active_requests = rpc.active_requests(); let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit(); let soft_limit = rpc.soft_limit();
let is_archive = rpc.is_archive(); let block_data_limit = rpc.get_block_data_limit();
let utilization = active_requests as f32 / soft_limit as f32; let utilization = active_requests as f32 / soft_limit as f32;
(rpc.clone(), (is_archive, utilization, soft_limit)) // TODO: double check this sorts how we want
(rpc.clone(), (block_data_limit, utilization, soft_limit))
}) })
.collect(); .collect();
@ -681,16 +706,18 @@ impl Web3Connections {
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
pub async fn get_upstream_servers( pub async fn get_upstream_servers(
&self, &self,
archive_needed: bool, min_block_needed: Option<U64>,
) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> { ) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> {
let mut earliest_retry_after = None; let mut earliest_retry_after = None;
// TODO: with capacity? // TODO: with capacity?
let mut selected_rpcs = vec![]; let mut selected_rpcs = vec![];
for connection in self.inner.iter() { for connection in self.inner.iter() {
if archive_needed && !connection.is_archive() { if let Some(min_block_needed) = min_block_needed {
if !connection.has_block_data(min_block_needed) {
continue; continue;
} }
}
// check rate limits and increment our connection counter // check rate limits and increment our connection counter
match connection.try_request_handle().await { match connection.try_request_handle().await {
@ -714,7 +741,7 @@ impl Web3Connections {
pub async fn try_send_best_upstream_server( pub async fn try_send_best_upstream_server(
&self, &self,
request: JsonRpcRequest, request: JsonRpcRequest,
archive_needed: bool, min_block_needed: Option<U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![]; let mut skip_rpcs = vec![];
@ -723,7 +750,10 @@ impl Web3Connections {
if skip_rpcs.len() == self.inner.len() { if skip_rpcs.len() == self.inner.len() {
break; break;
} }
match self.next_upstream_server(&skip_rpcs, archive_needed).await { match self
.next_upstream_server(&skip_rpcs, min_block_needed)
.await
{
Ok(active_request_handle) => { Ok(active_request_handle) => {
// save the rpc in case we get an error and want to retry on another server // save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection()); skip_rpcs.push(active_request_handle.clone_connection());
@ -807,10 +837,10 @@ impl Web3Connections {
pub async fn try_send_all_upstream_servers( pub async fn try_send_all_upstream_servers(
&self, &self,
request: JsonRpcRequest, request: JsonRpcRequest,
archive_needed: bool, min_block_needed: Option<U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self.get_upstream_servers(archive_needed).await { match self.get_upstream_servers(min_block_needed).await {
Ok(active_request_handles) => { Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures // TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle? // TODO: do something with this handle?
@ -864,7 +894,7 @@ mod tests {
fn test_false_before_true() { fn test_false_before_true() {
let mut x = [true, false, true]; let mut x = [true, false, true];
x.sort(); x.sort_unstable();
assert_eq!(x, [false, true, true]) assert_eq!(x, [false, true, true])
} }