Quick cache (#70)

* quick cache and allocate less

* improve /status cache

* prepare to cache raw transaction hashes so we dont dos our backends

* simple benchmark for /health and /status

* mut not needed with atomics

* DRY all the status pages

* use u64 instead of bytes for subscriptions

* fix setting earliest_retry_at and improve logs

* Revert "use kanal instead of flume or tokio channels (#68)"

This reverts commit 510612d343fc51338a8a4282dcc229b50097835b.

* fix automatic retries

* put relaxed back

* convert error message time to seconds

* assert instead of debug_assert while we debug

* ns instead of seconds

* disable peak_latency for now

* null is the default

* cargo fmt

* comments

* remove request caching for now

* log on exit

* unit weigher for now

* make cache smaller. we need a weigher for prod. just debugging

* oops. we need async

* add todo

* no need for to_string on a RawValue
This commit is contained in:
Bryan Stitt 2023-05-13 11:13:02 -07:00 committed by GitHub
parent b493f02c3d
commit 2080739865
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1282 additions and 1001 deletions

143
Cargo.lock generated

@ -68,6 +68,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
]
@ -792,6 +793,16 @@ dependencies = [
"cc",
]
[[package]]
name = "codespan-reporting"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e"
dependencies = [
"termcolor",
"unicode-width",
]
[[package]]
name = "coins-bip32"
version = "0.8.3"
@ -1194,6 +1205,50 @@ dependencies = [
"cipher 0.4.4",
]
[[package]]
name = "cxx"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f61f1b6389c3fe1c316bf8a4dccc90a38208354b330925bce1f74a6c4756eb93"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cee708e8962df2aeb38f594aae5d827c022b6460ac71a7a3e2c3c2aae5a07b"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn 2.0.15",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7944172ae7e4068c533afbb984114a56c46e9ccddda550499caa222902c7f7bb"
[[package]]
name = "cxxbridge-macro"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.15",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -1998,6 +2053,19 @@ dependencies = [
"miniz_oxide 0.7.1",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -2309,9 +2377,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.19"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782"
checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
dependencies = [
"bytes",
"fnv",
@ -2645,11 +2713,12 @@ dependencies = [
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
dependencies = [
"cc",
"cxx",
"cxx-build",
]
[[package]]
@ -2908,16 +2977,6 @@ dependencies = [
"signature 2.1.0",
]
[[package]]
name = "kanal"
version = "0.1.0-pre8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7"
dependencies = [
"futures-core",
"lock_api",
]
[[package]]
name = "keccak"
version = "0.1.4"
@ -2963,7 +3022,6 @@ name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"kanal",
"log",
"serde",
"tokio",
@ -3012,6 +3070,15 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.7"
@ -3208,6 +3275,15 @@ dependencies = [
"uuid 1.3.2",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@ -4138,6 +4214,17 @@ dependencies = [
"unicase",
]
[[package]]
name = "quick_cache"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5253a3a0d56548d5b0be25414171dc780cc6870727746d05bd2bde352eee96c5"
dependencies = [
"ahash 0.8.3",
"hashbrown 0.13.2",
"parking_lot 0.12.1",
]
[[package]]
name = "quote"
version = "1.0.27"
@ -4202,7 +4289,7 @@ dependencies = [
name = "rate-counter"
version = "0.1.0"
dependencies = [
"kanal",
"flume",
"tokio",
]
@ -4248,9 +4335,9 @@ dependencies = [
[[package]]
name = "rdkafka-sys"
version = "4.4.0+1.9.2"
version = "4.3.0+1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249"
checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4"
dependencies = [
"cmake",
"libc",
@ -4702,6 +4789,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "scrypt"
version = "0.10.0"
@ -4923,9 +5016,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.9.0"
version = "2.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1"
checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254"
dependencies = [
"bitflags",
"core-foundation",
@ -4936,9 +5029,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.9.0"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7"
checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4"
dependencies = [
"core-foundation-sys",
"libc",
@ -6577,6 +6670,7 @@ dependencies = [
"ethers",
"ewma",
"fdlimit",
"flume",
"fstrings",
"futures",
"gethostname",
@ -6591,7 +6685,6 @@ dependencies = [
"influxdb2-structmap",
"ipnet",
"itertools",
"kanal",
"latency",
"listenfd",
"log",
@ -6606,6 +6699,7 @@ dependencies = [
"parking_lot 0.12.1",
"prettytable",
"proctitle",
"quick_cache",
"rdkafka",
"redis-rate-limiter",
"regex",
@ -6616,6 +6710,7 @@ dependencies = [
"serde_json",
"serde_prometheus",
"siwe",
"strum",
"thread-fast-rng",
"time 0.3.21",
"tokio",

@ -60,13 +60,13 @@ Check that the websocket is working:
```
$ websocat ws://127.0.0.1:8544
{"id": 1, "method": "eth_subscribe", "params": ["newHeads"]}
{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newHeads"]}
{"id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
{"jsonrpc": "2.0", "id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
{"id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]}
{"jsonrpc": "2.0", "id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]}
{"id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]}
{"jsonrpc": "2.0", "id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]}
```
You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains.
@ -149,6 +149,8 @@ TODO: also enable debug symbols in the release build by modifying the root Cargo
Test the proxy:
wrk -t12 -c400 -d30s --latency http://127.0.0.1:8544/health
wrk -t12 -c400 -d30s --latency http://127.0.0.1:8544/status
wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY
wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY

@ -189,7 +189,7 @@ These are roughly in order of completition
- [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop
- need an kanal::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- [x] don't use unix timestamps for response_millis since leap seconds will confuse it
- [x] config to allow origins even on the anonymous endpoints
- [x] send logs to sentry

@ -7,7 +7,6 @@ edition = "2021"
[dependencies]
ewma = "0.1.1"
kanal = "0.1.0-pre8"
log = "0.4.17"
serde = { version = "1.0.163", features = [] }
tokio = { version = "1.28.1", features = ["full"] }

@ -2,8 +2,9 @@ mod rtt_estimate;
use std::sync::Arc;
use kanal::SendError;
use log::{error, info, trace};
use log::{error, info};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
@ -19,7 +20,7 @@ pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: kanal::AsyncSender<Duration>,
request_tx: mpsc::Sender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
@ -33,7 +34,7 @@ impl PeakEwmaLatency {
/// average latency.
pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = kanal::bounded_async(buf_size);
let (request_tx, request_rx) = mpsc::channel(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
@ -55,10 +56,10 @@ impl PeakEwmaLatency {
let mut estimate = self.rtt_estimate.load();
let now = Instant::now();
debug_assert!(
assert!(
estimate.update_at <= now,
"update_at={:?} in the future",
estimate.update_at,
"update_at is {}ns in the future",
estimate.update_at.duration_since(now).as_nanos(),
);
// Update the RTT estimate to account for decay since the last update.
@ -67,26 +68,20 @@ impl PeakEwmaLatency {
/// Report latency from a single request
///
/// Should only be called from the Web3Rpc that owns it.
/// Should only be called with a duration from the Web3Rpc that owns it.
pub fn report(&self, duration: Duration) {
match self.request_tx.try_send(duration) {
Ok(true) => {
trace!("success");
}
Ok(false) => {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
// We don't want to block if the channel is full, just
// report the error
error!("Latency report channel full");
// TODO: could we spawn a new tokio task to report tthis later?
}
Err(SendError::Closed) => {
Err(TrySendError::Closed(_)) => {
unreachable!("Owner should keep channel open");
}
Err(SendError::ReceiveClosed) => {
unreachable!("Receiver should keep channel open");
}
};
//.expect("Owner should keep channel open");
}
}
@ -94,7 +89,7 @@ impl PeakEwmaLatency {
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: kanal::AsyncReceiver<Duration>,
request_rx: mpsc::Receiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
@ -106,27 +101,25 @@ struct PeakEwmaLatencyTask {
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(mut self) {
while let Ok(rtt) = self.request_rx.recv().await {
while let Some(rtt) = self.request_rx.recv().await {
self.update(rtt);
}
info!("latency loop exited");
}
/// Update the estimate object atomically.
fn update(&mut self, rtt: Duration) {
fn update(&self, rtt: Duration) {
let rtt = nanos(rtt);
let now = Instant::now();
debug_assert!(
assert!(
self.update_at <= now,
"update_at={:?} in the future",
self.update_at,
"update_at is {}ns in the future",
self.update_at.duration_since(now).as_nanos(),
);
let x = self
.rtt_estimate
self.rtt_estimate
.fetch_update(|mut rtt_estimate| rtt_estimate.update(rtt, self.decay_ns, now));
info!("x: {:?}", x);
}
}

@ -17,12 +17,12 @@ mod m20230119_204135_better_free_tier;
mod m20230125_204810_stats_v2;
mod m20230130_124740_read_only_login_logic;
mod m20230130_165144_prepare_admin_imitation_pre_login;
mod m20230215_152254_admin_trail;
mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2;
mod m20230205_130035_create_balance;
mod m20230205_133755_create_referrals;
mod m20230214_134254_increase_balance_transactions;
mod m20230215_152254_admin_trail;
mod m20230221_230953_track_spend;
mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2;
mod m20230412_171916_modify_secondary_user_add_primary_user;
mod m20230422_172555_premium_downgrade_logic;
mod m20230511_161214_remove_columns_statsv2_origin_and_method;

@ -5,5 +5,5 @@ authors = ["Bryan Stitt <bryan@llamanodes.com>"]
edition = "2021"
[dependencies]
kanal = "0.1.0-pre8"
flume = "0.10.14"
tokio = { version = "1.28.1", features = ["time"] }

@ -43,6 +43,7 @@ env_logger = "0.10.0"
ethers = { version = "2.0.4", default-features = false, features = ["rustls", "ws"] }
ewma = "0.1.1"
fdlimit = "0.2.1"
flume = "0.10.14"
fstrings = "0.2"
futures = { version = "0.3.28", features = ["thread-pool"] }
gethostname = "0.4.2"
@ -57,7 +58,6 @@ influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rust
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
ipnet = "2.7.2"
itertools = "0.10.5"
kanal = "0.1.0-pre8"
listenfd = "1.0.1"
log = "0.4.17"
mimalloc = { version = "0.1.37", optional = true}
@ -70,6 +70,7 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
quick_cache = "0.3.0"
rdkafka = { version = "0.29.0" }
regex = "1.8.1"
reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] }
@ -79,6 +80,7 @@ serde = { version = "1.0.163", features = [] }
serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.2.2"
siwe = "0.5.0"
strum = { version = "0.24.1", features = ["derive"] }
time = "0.3.21"
tokio = { version = "1.28.1", features = ["full"] }
tokio-console = { version = "*", optional = true }

@ -4,12 +4,16 @@ mod ws;
use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{
Authorization, RequestMetadata, RequestOrMethod, RpcSecretKey,
Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, RpcSecretKey,
};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest,
JsonRpcRequestEnum,
};
use crate::response_cache::{
JsonRpcQueryCache, JsonRpcQueryCacheKey, JsonRpcQueryWeigher, JsonRpcResponseData,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
@ -46,15 +50,14 @@ use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
use serde_json::value::to_raw_value;
use std::borrow::Cow;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, watch, Semaphore};
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use ulid::Ulid;
@ -71,88 +74,6 @@ pub static APP_USER_AGENT: &str = concat!(
// aggregate across 1 week
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
#[derive(Debug, From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
from_block: Option<Web3ProxyBlock>,
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
to_block: Option<Web3ProxyBlock>,
method: String,
params: Option<serde_json::Value>,
cache_errors: bool,
}
impl ResponseCacheKey {
fn weight(&self) -> usize {
let mut w = self.method.len();
if let Some(ref p) = self.params {
w += p.to_string().len();
}
w
}
}
impl PartialEq for ResponseCacheKey {
fn eq(&self, other: &Self) -> bool {
if self.cache_errors != other.cache_errors {
return false;
}
match (self.from_block.as_ref(), other.from_block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
}
(Some(_), None) => {
return false;
}
(Some(s), Some(o)) => {
if s != o {
return false;
}
}
}
match (self.to_block.as_ref(), other.to_block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
}
(Some(_), None) => {
return false;
}
(Some(s), Some(o)) => {
if s != o {
return false;
}
}
}
if self.method != other.method {
return false;
}
self.params == other.params
}
}
impl Eq for ResponseCacheKey {}
impl Hash for ResponseCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.from_block.as_ref().map(|x| x.hash()).hash(state);
self.to_block.as_ref().map(|x| x.hash()).hash(state);
self.method.hash(state);
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
}
}
type ResponseCache =
Cache<ResponseCacheKey, JsonRpcForwardedResponse, hashbrown::hash_map::DefaultHashBuilder>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
/// TODO: move this
@ -224,7 +145,7 @@ pub struct Web3ProxyApp {
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
pub private_rpcs: Option<Arc<Web3Rpcs>>,
/// track JSONRPC responses
response_cache: ResponseCache,
pub jsonrpc_query_cache: JsonRpcQueryCache,
/// rpc clients that subscribe to newHeads use this channel
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
@ -265,7 +186,7 @@ pub struct Web3ProxyApp {
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
/// channel for sending stats in a background task
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
pub stat_sender: Option<flume::Sender<AppStat>>,
}
/// flatten a JoinError into an anyhow error
@ -695,23 +616,12 @@ impl Web3ProxyApp {
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
.max_capacity(top_config.app.response_cache_max_bytes)
.weigher(|k: &ResponseCacheKey, v| {
// TODO: is this good enough?
if let Ok(v) = serde_json::to_string(v) {
let weight = k.weight() + v.len();
// the or in unwrap_or is probably never called
weight.try_into().unwrap_or(u32::MAX)
} else {
// this seems impossible
u32::MAX
}
})
// TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_live(Duration::from_secs(600))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: we should emit stats to calculate a more accurate expected cache size
let response_cache = JsonRpcQueryCache::with_weighter(
(top_config.app.response_cache_max_bytes / 2048) as usize,
top_config.app.response_cache_max_bytes,
JsonRpcQueryWeigher,
);
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
@ -816,7 +726,7 @@ impl Web3ProxyApp {
http_client,
kafka_producer,
private_rpcs,
response_cache,
jsonrpc_query_cache: response_cache,
watch_consensus_head_receiver,
pending_tx_sender,
pending_transactions,
@ -1229,8 +1139,7 @@ impl Web3ProxyApp {
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Arc<RequestMetadata>,
num_public_rpcs: Option<usize>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcResponseData> {
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty() {
let protected_response = protected_rpcs
@ -1250,6 +1159,17 @@ impl Web3ProxyApp {
}
}
let num_public_rpcs = match authorization.checks.proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
ProxyMode::Fastest(x) => Some(x * 4),
ProxyMode::Versus => None,
};
// no private rpcs to send to. send to a few public rpcs
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
self.balanced_rpcs
@ -1277,6 +1197,11 @@ impl Web3ProxyApp {
// TODO: move this code to another module so that its easy to turn this trace logging on in dev
trace!("Received request: {:?}", request);
// save the id so we can attach it to the response
// TODO: we don't always need to clone this. if we come from the cache, we can just take from the request
// TODO: store on the request_metadata?
let response_id = request.id.clone();
let request_metadata = RequestMetadata::new(
self,
authorization.clone(),
@ -1285,7 +1210,7 @@ impl Web3ProxyApp {
)
.await;
let (status_code, response) = match self
let (status_code, response_data): (_, JsonRpcResponseData) = match self
._proxy_cached_request(authorization, request, head_block_num, &request_metadata)
.await
{
@ -1293,7 +1218,10 @@ impl Web3ProxyApp {
Err(err) => err.into_response_parts(),
};
request_metadata.add_response(&response);
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
// TODO: with parallel request sending, I think there could be a race on this
let rpcs = request_metadata.backend_rpcs_used();
@ -1308,14 +1236,12 @@ impl Web3ProxyApp {
request: &mut JsonRpcRequest,
head_block_num: Option<U64>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
// save the id so we can attach it to the response
let request_id = request.id.clone();
// TODO: don't clone
) -> Web3ProxyResult<JsonRpcResponseData> {
// TODO: don't clone?
let request_method = request.method.clone();
// TODO: serve net_version without querying the backend
let response: JsonRpcForwardedResponse = match request_method.as_ref() {
let response_data: JsonRpcResponseData = match request_method.as_ref() {
// lots of commands are blocked
method @ ("db_getHex"
| "db_getString"
@ -1324,6 +1250,9 @@ impl Web3ProxyApp {
| "debug_accountRange"
| "debug_backtraceAt"
| "debug_blockProfile"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool"
| "debug_bundler_sendBundleNow"
| "debug_chaindbCompact"
| "debug_chaindbProperty"
| "debug_cpuProfile"
@ -1337,8 +1266,8 @@ impl Web3ProxyApp {
| "debug_setGCPercent"
| "debug_setHead"
| "debug_setMutexProfileFraction"
| "debug_standardTraceBlockToFile"
| "debug_standardTraceBadBlockToFile"
| "debug_standardTraceBlockToFile"
| "debug_startCPUProfile"
| "debug_startGoTrace"
| "debug_stopCPUProfile"
@ -1346,6 +1275,7 @@ impl Web3ProxyApp {
| "debug_writeBlockProfile"
| "debug_writeMemProfile"
| "debug_writeMutexProfile"
| "erigon_cacheCheck"
| "eth_compileLLL"
| "eth_compileSerpent"
| "eth_compileSolidity"
@ -1355,24 +1285,23 @@ impl Web3ProxyApp {
| "eth_signTransaction"
| "eth_submitHashrate"
| "eth_submitWork"
| "erigon_cacheCheck"
| "les_addBalance"
| "les_setClientParams"
| "les_setDefaultParams"
| "miner_setEtherbase"
| "miner_setExtra"
| "miner_setGasLimit"
| "miner_setGasPrice"
| "miner_start"
| "miner_stop"
| "miner_setEtherbase"
| "miner_setGasLimit"
| "personal_ecRecover"
| "personal_importRawKey"
| "personal_listAccounts"
| "personal_lockAccount"
| "personal_newAccount"
| "personal_unlockAccount"
| "personal_sendTransaction"
| "personal_sign"
| "personal_ecRecover"
| "personal_unlockAccount"
| "shh_addToGroup"
| "shh_getFilterChanges"
| "shh_getMessages"
@ -1383,13 +1312,12 @@ impl Web3ProxyApp {
| "shh_post"
| "shh_uninstallFilter"
| "shh_version") => {
// i don't think we will ever support these methods
// i don't think we will ever support these methods. maybe do Forbidden?
// TODO: what error code?
JsonRpcForwardedResponse::from_string(
format!("method unsupported: {}", method),
None,
Some(request_id),
)
JsonRpcErrorData::from(format!(
"the method {} does not exist/is not available",
method
)).into()
}
// TODO: implement these commands
method @ ("eth_getFilterChanges"
@ -1401,21 +1329,11 @@ impl Web3ProxyApp {
| "eth_uninstallFilter") => {
// TODO: unsupported command stat. use the count to prioritize new features
// TODO: what error code?
JsonRpcForwardedResponse::from_string(
format!("not yet implemented: {}", method),
None,
Some(request_id),
)
}
method @ ("debug_bundler_sendBundleNow"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool") => {
JsonRpcForwardedResponse::from_string(
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format!("method unsupported: {}", method),
None,
Some(request_id),
)
JsonRpcErrorData::from(format!(
"the method {} is not yet implemented. contact us if you need this",
method
))
.into()
}
_method @ ("eth_sendUserOperation"
| "eth_estimateUserOperationGas"
@ -1435,18 +1353,14 @@ impl Web3ProxyApp {
}
None => {
// TODO: stats even when we error!
// TODO: use Web3ProxyError? dedicated error for no 4337 bundlers
return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into());
// TODO: dedicated error for no 4337 bundlers
return Err(Web3ProxyError::NoServersSynced);
}
},
"eth_accounts" => {
JsonRpcForwardedResponse::from_value(serde_json::Value::Array(vec![]), request_id)
}
"eth_accounts" => JsonRpcResponseData::from(serde_json::Value::Array(vec![])),
"eth_blockNumber" => {
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => {
JsonRpcForwardedResponse::from_value(json!(head_block_num), request_id)
}
Some(head_block_num) => JsonRpcResponseData::from(json!(head_block_num)),
None => {
// TODO: what does geth do if this happens?
// TODO: standard not synced error
@ -1454,19 +1368,16 @@ impl Web3ProxyApp {
}
}
}
"eth_chainId" => JsonRpcForwardedResponse::from_value(
json!(U64::from(self.config.chain_id)),
request_id,
),
"eth_chainId" => JsonRpcResponseData::from(json!(U64::from(self.config.chain_id))),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
// no need for serving coinbase
JsonRpcForwardedResponse::from_value(json!(Address::zero()), request_id)
JsonRpcResponseData::from(json!(Address::zero()))
}
"eth_estimateGas" => {
let mut response = self
let response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
@ -1477,8 +1388,8 @@ impl Web3ProxyApp {
)
.await?;
if let Some(gas_estimate) = response.result.take() {
let mut gas_estimate: U256 = serde_json::from_str(gas_estimate.get())
if let JsonRpcResponseData::Result { value, .. } = response_data {
let mut gas_estimate: U256 = serde_json::from_str(value.get())
.or(Err(Web3ProxyError::GasEstimateNotU256))?;
let gas_increase = if let Some(gas_increase_percent) =
@ -1495,14 +1406,14 @@ impl Web3ProxyApp {
gas_estimate += gas_increase;
JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id)
JsonRpcResponseData::from(json!(gas_estimate))
} else {
response
response_data
}
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
let mut response = self
let mut response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
@ -1514,13 +1425,13 @@ impl Web3ProxyApp {
.await?;
// if we got "null", it is probably because the tx is old. retry on nodes with old block data
if let Some(ref result) = response.result {
if result.get() == "null" {
if let JsonRpcResponseData::Result { value, .. } = &response_data {
if value.get() == "null" {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
response = self
response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
@ -1533,44 +1444,39 @@ impl Web3ProxyApp {
}
}
response
response_data
}
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => JsonRpcForwardedResponse::from_value(json!(U64::zero()), request_id),
"eth_mining" => {
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id)
}
"eth_hashrate" => JsonRpcResponseData::from(json!(U64::zero())),
"eth_mining" => JsonRpcResponseData::from(serde_json::Value::Bool(false)),
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
let num_public_rpcs = match authorization.checks.proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
ProxyMode::Fastest(x) => Some(x * 4),
ProxyMode::Versus => None,
};
// TODO: decode the transaction
let mut response = self
// TODO: error if the chain_id is incorrect
// TODO: check the cache to see if we have sent this transaction recently
// TODO: if so, use a cached response.
// TODO: if not,
// TODO: - cache successes for up to 1 minute
// TODO: - cache failures for 1 block (we see transactions skipped because out of funds. but that can change block to block)
let mut response_data = self
.try_send_protected(
authorization,
request,
request_metadata.clone(),
num_public_rpcs,
)
.await?;
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
if let Some(ref response_error) = response.error {
if response_error.code == -32000
&& (response_error.message == "ALREADY_EXISTS: already known"
|| response_error.message
== "INTERNAL_ERROR: existing tx with same hash")
if let JsonRpcResponseData::Error { value, .. } = &response_data {
if value.code == -32000
&& (value.message == "ALREADY_EXISTS: already known"
|| value.message == "INTERNAL_ERROR: existing tx with same hash")
{
let params = request
.params
@ -1608,21 +1514,19 @@ impl Web3ProxyApp {
trace!("tx_hash: {:#?}", tx_hash);
let tx_hash = to_raw_value(&tx_hash).unwrap();
response.error = None;
response.result = Some(tx_hash);
response_data = JsonRpcResponseData::from(tx_hash);
}
}
}
// emit transaction count stats
// TODO: use this cache to avoid sending duplicate transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let Some(ref tx_hash) = response.result {
if let JsonRpcResponseData::Result { value, .. } = &response_data {
let now = Utc::now().timestamp();
let app = self.clone();
let salted_tx_hash = format!("{}:{}", salt, tx_hash);
let salted_tx_hash = format!("{}:{}", salt, value.get());
let f = async move {
match app.redis_conn().await {
@ -1653,35 +1557,38 @@ impl Web3ProxyApp {
}
}
response
response_data
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id)
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(false))
}
"eth_subscribe" => JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_subscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
"eth_unsubscribe" => JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_unsubscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
"eth_subscribe" => JsonRpcErrorData {
message: Cow::Borrowed(
"notifications not supported. eth_subscribe is only available over a websocket",
),
code: -32601,
data: None,
}
.into(),
"eth_unsubscribe" => JsonRpcErrorData {
message: Cow::Borrowed("notifications not supported. eth_unsubscribe is only available over a websocket"),
code: -32601,
data: None,
}.into(),
"net_listening" => {
// TODO: only true if there are some backends on balanced_rpcs?
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(true), request_id)
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(true))
}
"net_peerCount" => JsonRpcForwardedResponse::from_value(
json!(U64::from(self.balanced_rpcs.num_synced_rpcs())),
request_id,
),
"web3_clientVersion" => JsonRpcForwardedResponse::from_value(
serde_json::Value::String(APP_USER_AGENT.to_string()),
request_id,
),
"net_peerCount" =>
JsonRpcResponseData::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())))
,
"web3_clientVersion" =>
JsonRpcResponseData::from(serde_json::Value::String(APP_USER_AGENT.to_string()))
,
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
@ -1692,11 +1599,11 @@ impl Web3ProxyApp {
{
// TODO: what error code?
// TODO: use Web3ProxyError::BadRequest
JsonRpcForwardedResponse::from_str(
"Invalid request",
Some(-32600),
Some(request_id),
)
JsonRpcErrorData {
message: Cow::Borrowed("Invalid request"),
code: -32600,
data: None
}.into()
} else {
// TODO: BadRequest instead of web3_context
let param = Bytes::from_str(
@ -1714,25 +1621,25 @@ impl Web3ProxyApp {
let hash = H256::from(keccak256(param));
JsonRpcForwardedResponse::from_value(json!(hash), request_id)
JsonRpcResponseData::from(json!(hash))
}
}
_ => {
// TODO: this needs the correct error code in the response
// TODO: Web3ProxyError::BadRequest instead?
JsonRpcForwardedResponse::from_str(
"invalid request",
Some(StatusCode::BAD_REQUEST.as_u16().into()),
Some(request_id),
)
JsonRpcErrorData {
message: Cow::Borrowed("invalid request"),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
}.into()
}
}
}
"test" => JsonRpcForwardedResponse::from_str(
"The method test does not exist/is not available.",
Some(-32601),
Some(request_id),
),
"test" => JsonRpcErrorData {
message: Cow::Borrowed("The method test does not exist/is not available."),
code: -32601,
data: None,
}.into(),
// anything else gets sent to backend rpcs and cached
method => {
if method.starts_with("admin_") {
@ -1746,12 +1653,12 @@ impl Web3ProxyApp {
.ok_or(Web3ProxyError::NoServersSynced)?;
// TODO: don't clone. this happens way too much. maybe &mut?
let mut request = request.clone();
// let mut request = request.clone();
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
// TODO: this cache key can be rather large. is that okay?
let cache_key: Option<ResponseCacheKey> = match block_needed(
let cache_key: Option<JsonRpcQueryCacheKey> = match block_needed(
authorization,
method,
request.params.as_mut(),
@ -1760,7 +1667,7 @@ impl Web3ProxyApp {
)
.await?
{
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey {
from_block: None,
to_block: None,
method: method.to_string(),
@ -1786,9 +1693,10 @@ impl Web3ProxyApp {
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.await?;
.await?
.block;
Some(ResponseCacheKey {
Some(JsonRpcQueryCacheKey {
from_block: Some(request_block),
to_block: None,
method: method.to_string(),
@ -1816,7 +1724,8 @@ impl Web3ProxyApp {
let from_block = self
.balanced_rpcs
.block(authorization, &from_block_hash, None)
.await?;
.await?
.block;
let (to_block_hash, _) = self
.balanced_rpcs
@ -1826,75 +1735,62 @@ impl Web3ProxyApp {
let to_block = self
.balanced_rpcs
.block(authorization, &to_block_hash, None)
.await?;
.await?
.block;
Some(ResponseCacheKey {
Some(JsonRpcQueryCacheKey {
from_block: Some(from_block),
to_block: Some(to_block),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
cache_errors,
})
}
};
trace!("cache_key: {:#?}", cache_key);
let mut response = {
let request_metadata = request_metadata.clone();
let authorization = authorization.clone();
let authorization = authorization.clone();
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number());
let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number());
match self
.jsonrpc_query_cache
.get_value_or_guard_async(&cache_key).await
{
Ok(x) => x,
Err(x) => {
let response_data = self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
.await?;
self.response_cache
.try_get_with(cache_key, async move {
// TODO: put the hash here instead of the block number? its in the request already.
let mut response = self
.balanced_rpcs
.try_proxy_connection(
&authorization,
&request,
Some(&request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
.await?;
// TODO: convert the Box<RawValue> to an Arc<RawValue>
x.insert(response_data.clone());
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response
// TODO: how are we going to stream this?
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us?
Ok::<_, Web3ProxyError>(response)
})
// TODO: add context (error while caching and forwarding response {})
.await?
} else {
self.balanced_rpcs
.try_proxy_connection(
&authorization,
&request,
Some(&request_metadata),
None,
None,
)
.await?
response_data
}
}
};
// since this data likely came out of a cache, the response.id is not going to match the request.id
// replace the id with our request's id.
response.id = request_id;
response
} else {
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
None,
None,
)
.await?
}
}
};
Ok(response)
Ok(response_data)
}
}

@ -5,9 +5,10 @@ use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMe
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest;
use crate::response_cache::JsonRpcResponseData;
use crate::rpcs::transactions::TxStatus;
use axum::extract::ws::Message;
use ethers::prelude::U64;
use ethers::types::U64;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::stream::StreamExt;
@ -24,7 +25,7 @@ impl Web3ProxyApp {
jsonrpc_request: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: kanal::AsyncSender<Message>,
response_sender: flume::Sender<Message>,
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
let request_metadata = RequestMetadata::new(
self,
@ -39,7 +40,7 @@ impl Web3ProxyApp {
// TODO: this only needs to be unique per connection. we don't need it globably unique
// TODO: have a max number of subscriptions per key/ip. have a global max number of subscriptions? how should this be calculated?
let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = U64::from(subscription_id);
let subscription_id = U64::from(subscription_id as u64);
// save the id so we can use it in the response
let id = jsonrpc_request.id.clone();
@ -94,7 +95,7 @@ impl Web3ProxyApp {
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
if response_sender.send_async(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
@ -158,7 +159,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -221,7 +222,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -285,7 +286,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -304,8 +305,11 @@ impl Web3ProxyApp {
// TODO: do something with subscription_join_handle?
let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
let response_data = JsonRpcResponseData::from(json!(subscription_id));
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);
// TODO: this serializes twice
request_metadata.add_response(&response);
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?

@ -11,6 +11,7 @@ use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use serde_json::json;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert};
@ -115,7 +116,7 @@ impl SentrydSubCommand {
let mut handles = FuturesUnordered::new();
// channels and a task for sending errors to logs/pagerduty
let (error_sender, error_receiver) = kanal::bounded_async::<SentrydError>(10);
let (error_sender, mut error_receiver) = mpsc::channel::<SentrydError>(10);
{
let error_handler_f = async move {
@ -123,7 +124,7 @@ impl SentrydSubCommand {
info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors");
}
while let Ok(err) = error_receiver.recv().await {
while let Some(err) = error_receiver.recv().await {
log::log!(err.level, "check failed: {:#?}", err);
if matches!(err.level, log::Level::Error) {
@ -257,7 +258,7 @@ async fn a_loop<T>(
class: &str,
seconds: u64,
error_level: log::Level,
error_sender: kanal::AsyncSender<SentrydError>,
error_sender: mpsc::Sender<SentrydError>,
f: impl Fn(SentrydErrorBuilder) -> T,
) -> anyhow::Result<()>
where

@ -285,8 +285,8 @@ impl Web3RpcConfig {
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
tx_id_sender: Option<kanal::AsyncSender<TxHashAndRpc>>,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {

@ -33,7 +33,7 @@ use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use ulid::Ulid;
@ -266,8 +266,8 @@ pub struct RequestMetadata {
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Cancel-safe channel to send stats to
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
/// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<flume::Sender<AppStat>>,
}
impl Default for RequestMetadata {
@ -355,7 +355,9 @@ impl ResponseOrBytes<'_> {
Self::Json(x) => serde_json::to_string(x)
.expect("this should always serialize")
.len(),
Self::Response(x) => x.num_bytes(),
Self::Response(x) => serde_json::to_string(x)
.expect("this should always serialize")
.len(),
Self::Bytes(num_bytes) => *num_bytes,
}
}

File diff suppressed because it is too large Load Diff

@ -16,26 +16,27 @@ use axum::{
routing::{get, post, put},
Extension, Router,
};
use http::header::AUTHORIZATION;
use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd;
use log::info;
use moka::future::Cache;
use std::iter::once;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{iter::once, time::Duration};
use tokio::sync::broadcast;
use strum::{EnumCount, EnumIter};
use tokio::{sync::broadcast, time::Instant};
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
/// simple keys for caching responses
#[derive(Clone, Hash, PartialEq, Eq)]
pub enum FrontendResponseCaches {
#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)]
pub enum FrontendResponseCacheKey {
BackupsNeeded,
Health,
Status,
}
pub type FrontendJsonResponseCache =
Cache<FrontendResponseCaches, Arc<serde_json::Value>, hashbrown::hash_map::DefaultHashBuilder>;
pub type FrontendHealthCache = Cache<(), bool, hashbrown::hash_map::DefaultHashBuilder>;
quick_cache::sync::Cache<FrontendResponseCacheKey, ((StatusCode, axum::body::Bytes), Instant)>;
/// Start the frontend server.
pub async fn serve(
@ -47,14 +48,9 @@ pub async fn serve(
// setup caches for whatever the frontend needs
// no need for max items since it is limited by the enum key
// TODO: latest moka allows for different ttls for different
let json_response_cache: FrontendJsonResponseCache = Cache::builder()
.time_to_live(Duration::from_secs(2))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let response_cache_size = FrontendResponseCacheKey::COUNT;
// /health gets a cache with a shorter lifetime
let health_cache: FrontendHealthCache = Cache::builder()
.time_to_live(Duration::from_millis(100))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let json_response_cache = FrontendJsonResponseCache::new(response_cache_size);
// TODO: read config for if fastest/versus should be available publicly. default off
@ -220,8 +216,7 @@ pub async fn serve(
// application state
.layer(Extension(proxy_app))
// frontend caches
.layer(Extension(json_response_cache))
.layer(Extension(health_cache))
.layer(Extension(Arc::new(json_response_cache)))
// 404 for any unknown routes
.fallback(errors::handler_404);

@ -63,6 +63,8 @@ async fn _proxy_web3_rpc(
let authorization = Arc::new(authorization);
// TODO: calculate payload bytes here (before turning into serde_json::Value). that will save serializing later
let (status_code, response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await

@ -20,7 +20,7 @@ use axum::{
};
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use ethers::types::Bytes;
use ethers::types::U64;
use fstrings::{f, format_args_f};
use futures::SinkExt;
use futures::{
@ -311,7 +311,7 @@ async fn proxy_web3_socket(
let (ws_tx, ws_rx) = socket.split();
// create a channel for our reader and writer can communicate. todo: benchmark different channels
let (response_sender, response_receiver) = kanal::unbounded_async::<Message>();
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
@ -323,25 +323,28 @@ async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &kanal::AsyncSender<Message>,
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
subscriptions: Arc<RwLock<HashMap<Bytes, AbortHandle>>>,
subscriptions: Arc<RwLock<HashMap<U64, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
let (authorization, semaphore) = match authorization.check_again(&app).await {
Ok((a, s)) => (a, s),
Err(err) => {
let (_, err) = err.into_response_parts();
let err = serde_json::to_string(&err).expect("to_string should always work here");
let err = JsonRpcForwardedResponse::from_response_data(err, Default::default());
let err = serde_json::to_string(&err)?;
return Ok((Message::Text(err), None));
}
};
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
// TODO: change response into response_data
let (response_id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(json_request) => {
let id = json_request.id.clone();
let response_id = json_request.id.clone();
// TODO: move this to a seperate function so we can use the try operator
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> =
@ -366,9 +369,9 @@ async fn handle_socket_payload(
.as_ref()
.context("there should be a result here")?;
// TODO: there must be a better way to do this
let k: Bytes = serde_json::from_str(result.get())
.context("subscription ids must be bytes")?;
// TODO: there must be a better way to turn a RawValue
let k: U64 = serde_json::from_str(result.get())
.context("subscription ids must be U64s")?;
x.insert(k, handle);
};
@ -384,7 +387,7 @@ async fn handle_socket_payload(
.await;
#[derive(serde::Deserialize)]
struct EthUnsubscribeParams([Bytes; 1]);
struct EthUnsubscribeParams([U64; 1]);
if let Some(params) = json_request.params {
match serde_json::from_value(params) {
@ -403,9 +406,10 @@ async fn handle_socket_payload(
}
};
// TODO: don't create the response here. use a JsonRpcResponseData instead
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
id.clone(),
response_id.clone(),
);
request_metadata.add_response(&response);
@ -428,7 +432,7 @@ async fn handle_socket_payload(
.map(|(status_code, response, _)| response),
};
(id, response)
(response_id, response)
}
Err(err) => {
let id = JsonRpcId::None.to_raw_value();
@ -439,8 +443,10 @@ async fn handle_socket_payload(
let response_str = match response {
Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"),
Err(err) => {
let (_, mut response) = err.into_response_parts();
response.id = id;
let (_, response_data) = err.into_response_parts();
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
serde_json::to_string(&response).expect("to_string should always work here")
}
};
@ -452,7 +458,7 @@ async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: kanal::AsyncSender<Message>,
response_sender: flume::Sender<Message>,
) {
// RwLock should be fine here. a user isn't going to be opening tons of subscriptions
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
@ -528,7 +534,7 @@ async fn read_web3_socket(
}
};
if response_sender.send(response_msg).await.is_err() {
if response_sender.send_async(response_msg).await.is_err() {
let _ = close_sender.send(true);
return;
};
@ -549,13 +555,13 @@ async fn read_web3_socket(
}
async fn write_web3_socket(
response_rx: kanal::AsyncReceiver<Message>,
response_rx: flume::Receiver<Message>,
mut ws_tx: SplitSink<WebSocket, Message>,
) {
// TODO: increment counter for open websockets
// TODO: is there any way to make this stream receive.
while let Ok(msg) = response_rx.recv().await {
while let Ok(msg) = response_rx.recv_async().await {
// a response is ready
// TODO: poke rate limits for this user?

@ -3,32 +3,97 @@
//! For ease of development, users can currently access these endponts.
//! They will eventually move to another port.
use super::{FrontendHealthCache, FrontendJsonResponseCache, FrontendResponseCaches};
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use super::{FrontendJsonResponseCache, FrontendResponseCacheKey};
use crate::{
app::{Web3ProxyApp, APP_USER_AGENT},
frontend::errors::Web3ProxyError,
};
use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension};
use axum_macros::debug_handler;
use futures::Future;
use once_cell::sync::Lazy;
use serde_json::json;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
static BACKUPS_NEEDED_TRUE: Lazy<Bytes> = Lazy::new(|| Bytes::from("true\n"));
static BACKUPS_NEEDED_FALSE: Lazy<Bytes> = Lazy::new(|| Bytes::from("false\n"));
/// simple ttl for
// TODO: make this generic for any cache/key
async fn _quick_cache_ttl<Fut>(
app: Arc<Web3ProxyApp>,
cache: Arc<FrontendJsonResponseCache>,
key: FrontendResponseCacheKey,
f: impl Fn(Arc<Web3ProxyApp>) -> Fut,
) -> (StatusCode, Bytes)
where
Fut: Future<Output = (StatusCode, Bytes)>,
{
let mut response;
let expire_at;
(response, expire_at) = cache
.get_or_insert_async::<Web3ProxyError>(&key, async {
let expire_at = Instant::now() + Duration::from_millis(1000);
let response = f(app.clone()).await;
Ok((response, expire_at))
})
.await
.unwrap();
if Instant::now() >= expire_at {
// TODO: this expiration isn't perfect
// parallel requests could overwrite eachother
// its good enough for now
let expire_at = Instant::now() + Duration::from_millis(1000);
response = f(app).await;
cache.insert(key, (response.clone(), expire_at));
}
response
}
/// Health check page for load balancers to use.
#[debug_handler]
pub async fn health(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(health_cache): Extension<FrontendHealthCache>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
let synced = health_cache
.get_with((), async { app.balanced_rpcs.synced() })
.await;
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await
}
if synced {
(StatusCode::OK, "OK")
async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
if app.balanced_rpcs.synced() {
(StatusCode::OK, HEALTH_OK.clone())
} else {
(StatusCode::SERVICE_UNAVAILABLE, ":(")
(StatusCode::SERVICE_UNAVAILABLE, HEALTH_NOT_OK.clone())
}
}
/// Easy alerting if backup servers are in use.
pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
#[debug_handler]
pub async fn backups_needed(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(
app,
cache,
FrontendResponseCacheKey::BackupsNeeded,
_backups_needed,
)
.await
}
async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
let code = {
let consensus_rpcs = app
.balanced_rpcs
@ -49,9 +114,9 @@ pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> imp
};
if matches!(code, StatusCode::OK) {
(code, "no backups needed. :)")
(code, BACKUPS_NEEDED_FALSE.clone())
} else {
(code, "backups needed! :(")
(code, BACKUPS_NEEDED_TRUE.clone())
}
}
@ -61,23 +126,33 @@ pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> imp
#[debug_handler]
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(response_cache): Extension<FrontendJsonResponseCache>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
let body = response_cache
.get_with(FrontendResponseCaches::Status, async {
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({
"version": APP_USER_AGENT,
"chain_id": app.config.chain_id,
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"hostname": app.hostname,
});
Arc::new(body)
})
.await;
Json(body)
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await
}
// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({
"version": APP_USER_AGENT,
"chain_id": app.config.chain_id,
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"bundler_4337_rpcs": app.bundler_4337_rpcs,
"hostname": app.hostname,
});
let body = body.to_string().into_bytes();
let body = Bytes::from(body);
let code = if app.balanced_rpcs.synced() {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(code, body)
}

@ -321,7 +321,7 @@ pub async fn user_login_post(
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.conn())
.await?
.ok_or(Web3ProxyError::InvalidReferralCode)?;
.ok_or(Web3ProxyError::UnknownReferralCode)?;
// Create a new item in the database,
// marking this guy as the referrer (and ignoring a duplicate insert, if there is any...)

@ -1,20 +1,17 @@
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::response_cache::JsonRpcResponseData;
use derive_more::From;
use ethers::prelude::ProviderError;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::value::{to_raw_value, RawValue};
use std::borrow::Cow;
use std::fmt;
fn default_jsonrpc() -> String {
"2.0".to_string()
}
// TODO: &str here instead of String should save a lot of allocations
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcRequest {
// TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad
#[serde(default = "default_jsonrpc")]
pub jsonrpc: String,
/// id could be a stricter type, but many rpcs do things against the spec
pub id: Box<RawValue>,
@ -51,7 +48,7 @@ impl JsonRpcRequest {
params: Option<serde_json::Value>,
) -> anyhow::Result<Self> {
let x = Self {
jsonrpc: default_jsonrpc(),
jsonrpc: "2.0".to_string(),
id: id.to_raw_value(),
method,
params,
@ -194,19 +191,38 @@ pub struct JsonRpcErrorData {
/// The error code
pub code: i64,
/// The error message
pub message: String,
pub message: Cow<'static, str>,
/// Additional data
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
impl From<&'static str> for JsonRpcErrorData {
fn from(value: &'static str) -> Self {
Self {
code: -32000,
message: Cow::Borrowed(value),
data: None,
}
}
}
impl From<String> for JsonRpcErrorData {
fn from(value: String) -> Self {
Self {
code: -32000,
message: Cow::Owned(value),
data: None,
}
}
}
/// A complete response
/// TODO: better Debug response
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
// TODO: jsonrpc a &str?
#[serde(default = "default_jsonrpc")]
pub jsonrpc: String,
pub jsonrpc: &'static str,
pub id: Box<RawValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Box<RawValue>>,
@ -242,40 +258,40 @@ impl JsonRpcForwardedResponse {
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
// TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton?
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: id.unwrap_or_else(|| JsonRpcId::None.to_raw_value()),
jsonrpc: "2.0",
id: id.unwrap_or_default(),
result: None,
error: Some(JsonRpcErrorData {
code: code.unwrap_or(-32099),
message,
message: Cow::Owned(message),
// TODO: accept data as an argument
data: None,
}),
}
}
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
pub fn from_raw_response(result: Box<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
jsonrpc: "2.0",
id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(partial_response),
result: Some(result),
error: None,
}
}
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response =
to_raw_value(&partial_response).expect("Value to RawValue should always work");
pub fn from_value(result: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response = to_raw_value(&result).expect("Value to RawValue should always work");
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
jsonrpc: "2.0",
id,
result: Some(partial_response),
error: None,
}
}
// TODO: delete this. its on JsonRpcErrorData
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> Web3ProxyResult<Self> {
// TODO: move turning ClientError into json to a helper function?
let code;
@ -302,12 +318,12 @@ impl JsonRpcForwardedResponse {
}
Ok(Self {
jsonrpc: "2.0".to_string(),
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
message: Cow::Owned(message),
data,
}),
})
@ -318,16 +334,21 @@ impl JsonRpcForwardedResponse {
id: Box<RawValue>,
) -> Web3ProxyResult<Self> {
match result {
Ok(response) => Ok(Self::from_response(response, id)),
Ok(response) => Ok(Self::from_raw_response(response, id)),
Err(e) => Self::from_ethers_error(e, id),
}
}
pub fn num_bytes(&self) -> usize {
// TODO: not sure how to do this without wasting a ton of allocations
serde_json::to_string(self)
.expect("this should always be valid json")
.len()
pub fn from_response_data(data: JsonRpcResponseData, id: Box<RawValue>) -> Self {
match data {
JsonRpcResponseData::Result { value, .. } => Self::from_raw_response(value, id),
JsonRpcResponseData::Error { value, .. } => JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(value),
},
}
}
}

@ -8,6 +8,7 @@ pub mod jsonrpc;
pub mod pagerduty;
pub mod prometheus;
pub mod referral_code;
pub mod response_cache;
pub mod rpcs;
pub mod stats;
pub mod user_token;

@ -0,0 +1,137 @@
use crate::{
frontend::errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain::ArcBlock,
};
use derive_more::From;
use ethers::providers::ProviderError;
use quick_cache::{sync::Cache as QuickCache, Weighter};
use serde_json::value::RawValue;
use std::{
borrow::Cow,
hash::{Hash, Hasher},
num::NonZeroU32,
};
#[derive(Clone, Debug, From, PartialEq, Eq)]
pub struct JsonRpcQueryCacheKey {
pub from_block: Option<ArcBlock>,
pub to_block: Option<ArcBlock>,
pub method: String,
pub params: Option<serde_json::Value>,
pub cache_errors: bool,
}
impl Hash for JsonRpcQueryCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.from_block.as_ref().map(|x| x.hash).hash(state);
self.to_block.as_ref().map(|x| x.hash).hash(state);
self.method.hash(state);
// make sure preserve_order feature is OFF
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
}
}
pub type JsonRpcQueryCache =
QuickCache<JsonRpcQueryCacheKey, JsonRpcResponseData, JsonRpcQueryWeigher>;
#[derive(Clone)]
pub struct JsonRpcQueryWeigher;
#[derive(Clone)]
pub enum JsonRpcResponseData {
Result {
value: Box<RawValue>,
size: Option<NonZeroU32>,
},
Error {
value: JsonRpcErrorData,
size: Option<NonZeroU32>,
},
}
impl JsonRpcResponseData {
pub fn num_bytes(&self) -> NonZeroU32 {
// TODO: dry this somehow
match self {
JsonRpcResponseData::Result { value, size } => size.unwrap_or_else(|| {
let size = value.get().len();
NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap()
}),
JsonRpcResponseData::Error { value, size } => size.unwrap_or_else(|| {
let size = serde_json::to_string(value).unwrap().len();
NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap()
}),
}
}
}
impl From<serde_json::Value> for JsonRpcResponseData {
fn from(value: serde_json::Value) -> Self {
let value = RawValue::from_string(value.to_string()).unwrap();
Self::Result { value, size: None }
}
}
impl From<Box<RawValue>> for JsonRpcResponseData {
fn from(value: Box<RawValue>) -> Self {
Self::Result { value, size: None }
}
}
impl From<JsonRpcErrorData> for JsonRpcResponseData {
fn from(value: JsonRpcErrorData) -> Self {
Self::Error { value, size: None }
}
}
impl TryFrom<ProviderError> for JsonRpcErrorData {
type Error = Web3ProxyError;
fn try_from(e: ProviderError) -> Result<Self, Self::Error> {
// TODO: move turning ClientError into json to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(err) => {
if let Some(err) = err.as_error_response() {
code = err.code;
message = err.message.clone();
data = err.data.clone();
} else if let Some(err) = err.as_serde_error() {
// this is not an rpc error. keep it as an error
return Err(Web3ProxyError::BadResponse(format!(
"bad response: {}",
err
)));
} else {
return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into());
}
}
e => return Err(e.into()),
}
Ok(JsonRpcErrorData {
code,
message: Cow::Owned(message),
data,
})
}
}
impl Weighter<JsonRpcQueryCacheKey, (), JsonRpcResponseData> for JsonRpcQueryWeigher {
fn weight(
&self,
_key: &JsonRpcQueryCacheKey,
_qey: &(),
value: &JsonRpcResponseData,
) -> NonZeroU32 {
value.num_bytes()
}
}

@ -5,6 +5,7 @@ use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::response_cache::JsonRpcResponseData;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
@ -250,15 +251,14 @@ impl Web3Rpcs {
)
.await?;
if response.error.is_some() {
return Err(response.into());
}
let value = match response {
JsonRpcResponseData::Error { .. } => {
return Err(anyhow::anyhow!("failed fetching block").into());
}
JsonRpcResponseData::Result { value, .. } => value,
};
let block = response
.result
.web3_context("no error, but also no block")?;
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
let block: Option<ArcBlock> = serde_json::from_str(value.get())?;
let block: ArcBlock = block.web3_context("no block in the response")?;
@ -346,13 +346,14 @@ impl Web3Rpcs {
.try_send_best_consensus_head_connection(authorization, &request, None, Some(num), None)
.await?;
if response.error.is_some() {
return Err(response.into());
}
let value = match response {
JsonRpcResponseData::Error { .. } => {
return Err(anyhow::anyhow!("failed fetching block").into());
}
JsonRpcResponseData::Result { value, .. } => value,
};
let raw_block = response.result.web3_context("no cannonical block result")?;
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
let block: ArcBlock = serde_json::from_str(value.get())?;
let block = Web3ProxyBlock::try_from(block)?;
@ -365,7 +366,7 @@ impl Web3Rpcs {
pub(super) async fn process_incoming_blocks(
&self,
authorization: &Arc<Authorization>,
block_receiver: kanal::AsyncReceiver<BlockAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
@ -373,7 +374,7 @@ impl Web3Rpcs {
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {
match block_receiver.recv().await {
match block_receiver.recv_async().await {
Ok((new_block, rpc)) => {
let rpc_name = rpc.name.clone();

@ -373,9 +373,9 @@ impl ConsensusFinder {
.0
.tier;
trace!("first_tier: {}", current_tier);
// trace!("first_tier: {}", current_tier);
trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier);
// trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier);
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement

@ -8,7 +8,8 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest};
use crate::response_cache::JsonRpcResponseData;
use crate::rpcs::consensus::{RankedRpcMap, RpcRanking};
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
@ -23,12 +24,13 @@ use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
use moka::future::Cache;
use ordered_float::OrderedFloat;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::cmp::{min_by_key, Reverse};
use std::collections::BTreeMap;
use std::fmt;
@ -43,7 +45,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
#[derive(From)]
pub struct Web3Rpcs {
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: kanal::AsyncSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: ArcSwap<HashMap<String, Arc<Web3Rpc>>>,
/// notify all http providers to check their blocks at the same time
@ -55,10 +57,11 @@ pub struct Web3Rpcs {
pub(crate) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) pending_tx_id_receiver: kanal::AsyncReceiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: kanal::AsyncSender<TxHashAndRpc>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) blocks_by_hash: BlocksByHashCache,
@ -94,8 +97,8 @@ impl Web3Rpcs {
watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
// watch::Receiver<Arc<ConsensusWeb3Rpcs>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (block_sender, block_receiver) = kanal::unbounded_async::<BlockAndRpc>();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
// TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check?
let expected_block_time_ms = match chain_id {
@ -347,7 +350,7 @@ impl Web3Rpcs {
async fn subscribe(
self: Arc<Self>,
authorization: Arc<Authorization>,
block_receiver: kanal::AsyncReceiver<BlockAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -362,7 +365,7 @@ impl Web3Rpcs {
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = tokio::task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv().await {
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
let f = clone.clone().process_incoming_tx_id(
authorization.clone(),
rpc,
@ -423,12 +426,11 @@ impl Web3Rpcs {
pub async fn try_send_parallel_requests(
&self,
active_request_handles: Vec<OpenRequestHandle>,
id: Box<RawValue>,
method: &str,
params: Option<&serde_json::Value>,
error_level: Level,
// TODO: remove this box once i figure out how to do the options
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcResponseData> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
let responses = active_request_handles
@ -447,24 +449,16 @@ impl Web3Rpcs {
let mut count_map: HashMap<String, _> = HashMap::new();
let mut counts: Counter<String> = Counter::new();
let mut any_ok_with_json_result = false;
let mut any_ok_but_maybe_json_error = false;
for partial_response in responses {
if partial_response.is_ok() {
any_ok_with_json_result = true;
}
let response =
JsonRpcForwardedResponse::try_from_response_result(partial_response, id.clone());
// TODO: better key?
let s = format!("{:?}", response);
// TODO: better key!
let s = format!("{:?}", partial_response);
if count_map.get(&s).is_none() {
if response.is_ok() {
any_ok_but_maybe_json_error = true;
}
count_map.insert(s.clone(), response);
count_map.insert(s.clone(), partial_response);
}
counts.update([s].into_iter());
@ -477,19 +471,18 @@ impl Web3Rpcs {
match most_common {
Ok(x) => {
if any_ok_with_json_result && x.error.is_some() {
// this one may be an "Ok", but the json has an error inside it
continue;
}
// return the most common success
return Ok(x);
return Ok(x.into());
}
Err(err) => {
if any_ok_but_maybe_json_error {
if any_ok_with_json_result {
// the most common is an error, but there is an Ok in here somewhere. loop to find it
continue;
}
return Err(err);
let err: JsonRpcErrorData = err.try_into()?;
return Ok(err.into());
}
}
}
@ -601,7 +594,18 @@ impl Web3Rpcs {
return Ok(OpenRequestResult::Handle(handle));
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
trace!(
"{:?} - retry on {} @ {}",
request_ulid,
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
@ -641,7 +645,16 @@ impl Web3Rpcs {
Ok(OpenRequestResult::NotReady)
}
Some(earliest_retry_at) => {
warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
// TODO: log the server that retry_at came from
// TODO: `self` doesn't log well. get a pretty name for this group of servers
warn!(
"{:?} - no servers on {:?}! retry in {:?}s",
request_ulid,
self,
earliest_retry_at
.duration_since(Instant::now())
.as_secs_f32()
);
Ok(OpenRequestResult::RetryAt(earliest_retry_at))
}
@ -733,7 +746,7 @@ impl Web3Rpcs {
match rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
warn!("{} is rate limited. skipping", rpc);
trace!("{} is rate limited. skipping", rpc);
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::Handle(handle)) => {
@ -767,7 +780,7 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcResponseData> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
@ -783,7 +796,7 @@ impl Web3Rpcs {
.best_available_rpc(
authorization,
request_metadata,
&[],
&skip_rpcs,
min_block_needed,
max_block_needed,
)
@ -800,10 +813,11 @@ impl Web3Rpcs {
let is_backup_response = rpc.backup;
// TODO: instead of entirely skipping, maybe demote a tier?
skip_rpcs.push(rpc);
// TODO: get the log percent from the user data
let response_result = active_request_handle
let response_result: Result<Box<RawValue>, _> = active_request_handle
.request(
&request.method,
&json!(request.params),
@ -812,10 +826,7 @@ impl Web3Rpcs {
)
.await;
match JsonRpcForwardedResponse::try_from_response_result(
response_result,
request.id.clone(),
) {
match response_result {
Ok(response) => {
// TODO: if there are multiple responses being aggregated, this will only use the last server's backup type
if let Some(request_metadata) = request_metadata {
@ -824,97 +835,98 @@ impl Web3Rpcs {
.store(is_backup_response, Ordering::Release);
}
if let Some(error) = response.error.as_ref() {
// trace!(?response, "rpc error");
return Ok(response.into());
}
Err(error) => {
// trace!(?response, "rpc error");
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
// TODO: separate jsonrpc error and web3 proxy error!
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
let error: JsonRpcErrorData = error.try_into()?;
// some errors should be retried on other nodes
let error_msg = error.message.as_ref();
// different providers do different codes. check all of them
// TODO: there's probably more strings to add here
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
warn!("rate limited by {}", skip_rpcs.last().unwrap());
continue;
}
}
// some errors should be retried on other nodes
let error_msg = error.message.as_str();
match error.code {
-32000 => {
// TODO: regex?
let retry_prefixes = [
"header not found",
"header for hash not found",
"missing trie node",
"node not started",
"RPC timeout",
];
for retry_prefix in retry_prefixes {
if error_msg.starts_with(retry_prefix) {
// TODO: too verbose
debug!("retrying on another server");
continue;
}
}
}
-32601 => {
let error_msg = error.message.as_ref();
// different providers do different codes. check all of them
// TODO: there's probably more strings to add here
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
warn!("rate limited by {}", skip_rpcs.last().unwrap());
// sometimes a provider does not support all rpc methods
// we check other connections rather than returning the error
// but sometimes the method is something that is actually unsupported,
// so we save the response here to return it later
// some providers look like this
if error_msg.starts_with("the method")
&& error_msg.ends_with("is not available")
{
method_not_available_response = Some(error);
continue;
}
// others look like this (this is the example in the official spec)
if error_msg == "Method not found" {
method_not_available_response = Some(error);
continue;
}
}
match error.code {
-32000 => {
// TODO: regex?
let retry_prefixes = [
"header not found",
"header for hash not found",
"missing trie node",
"node not started",
"RPC timeout",
];
for retry_prefix in retry_prefixes {
if error_msg.starts_with(retry_prefix) {
continue;
}
}
}
-32601 => {
let error_msg = error.message.as_str();
// sometimes a provider does not support all rpc methods
// we check other connections rather than returning the error
// but sometimes the method is something that is actually unsupported,
// so we save the response here to return it later
// some providers look like this
if error_msg.starts_with("the method")
&& error_msg.ends_with("is not available")
{
method_not_available_response = Some(response);
continue;
}
// others look like this (this is the example in the official spec)
if error_msg == "Method not found" {
method_not_available_response = Some(response);
continue;
}
}
_ => {}
}
} else {
// trace!(?response, "rpc success");
_ => {}
}
return Ok(response);
}
Err(err) => {
let rpc = skip_rpcs
.last()
.expect("there must have been a provider if we got an error");
// let rpc = skip_rpcs
// .last()
// .expect("there must have been a provider if we got an error");
// TODO: emit a stat. if a server is getting skipped a lot, something is not right
// TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest
trace!(
"Backend server error on {}! Retrying {:?} on another. err={:?}",
rpc,
request,
err
);
// TODO! WRONG! ONLY SET RETRY_AT IF THIS IS A SERVER/CONNECTION ERROR. JSONRPC "error" is FINE
// trace!(
// "Backend server error on {}! Retrying {:?} on another. err={:?}",
// rpc,
// request,
// error,
// );
// if let Some(ref hard_limit_until) = rpc.hard_limit_until {
// let retry_at = Instant::now() + Duration::from_secs(1);
if let Some(ref hard_limit_until) = rpc.hard_limit_until {
let retry_at = Instant::now() + Duration::from_secs(1);
// hard_limit_until.send_replace(retry_at);
// }
hard_limit_until.send_replace(retry_at);
}
continue;
return Ok(error.into());
}
}
}
@ -923,8 +935,8 @@ impl Web3Rpcs {
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!(
"All rate limits exceeded. waiting for change in synced servers or {:?}",
retry_at
"All rate limits exceeded. waiting for change in synced servers or {:?}s",
retry_at.duration_since(Instant::now()).as_secs_f32()
);
// TODO: have a separate column for rate limited?
@ -934,6 +946,7 @@ impl Web3Rpcs {
tokio::select! {
_ = sleep_until(retry_at) => {
trace!("slept!");
skip_rpcs.pop();
}
_ = watch_consensus_connections.changed() => {
@ -948,6 +961,8 @@ impl Web3Rpcs {
let waiting_for = min_block_needed.max(max_block_needed);
info!("waiting for {:?}", waiting_for);
if watch_for_block(waiting_for, &mut watch_consensus_connections).await? {
// block found! continue so we can check for another rpc
} else {
@ -966,10 +981,12 @@ impl Web3Rpcs {
.store(true, Ordering::Release);
}
if let Some(r) = method_not_available_response {
if let Some(err) = method_not_available_response {
// TODO: this error response is likely the user's fault. do we actually want it marked as an error? maybe keep user and server error bools?
// TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend
return Ok(r);
// TODO: this is too verbose!
debug!("{}", serde_json::to_string(&err)?);
return Ok(err.into());
}
let num_conns = self.by_name.load().len();
@ -995,8 +1012,6 @@ impl Web3Rpcs {
"No archive servers synced (min {:?}, max {:?}, head {:?}) ({} known)",
min_block_needed, max_block_needed, head_block_num, num_conns
);
} else if num_skipped == 0 {
// TODO: what should we log?
} else {
error!(
"Requested data is not available (min {:?}, max {:?}, head {:?}) ({} skipped, {} known)",
@ -1008,11 +1023,12 @@ impl Web3Rpcs {
// TODO: what error code?
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
Ok(JsonRpcForwardedResponse::from_str(
"Requested data is not available",
Some(-32043),
Some(request.id.clone()),
))
Ok(JsonRpcErrorData {
message: Cow::Borrowed("Requested data is not available"),
code: -32043,
data: None,
}
.into())
}
/// be sure there is a timeout on this or it might loop forever
@ -1027,7 +1043,7 @@ impl Web3Rpcs {
error_level: Level,
max_count: Option<usize>,
always_include_backups: bool,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcResponseData> {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let start = Instant::now();
@ -1071,7 +1087,6 @@ impl Web3Rpcs {
return self
.try_send_parallel_requests(
active_request_handles,
request.id.clone(),
request.method.as_ref(),
request.params.as_ref(),
error_level,
@ -1126,7 +1141,7 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcResponseData> {
match authorization.checks.proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
self.try_send_best_consensus_head_connection(
@ -1158,7 +1173,7 @@ impl Serialize for Web3Rpcs {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
let mut state = serializer.serialize_struct("Web3Rpcs", 1)?;
{
let by_name = self.by_name.load();
@ -1178,12 +1193,12 @@ impl Serialize for Web3Rpcs {
}
}
self.blocks_by_hash.sync();
self.blocks_by_number.sync();
state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?;
state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?;
state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?;
state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?;
// self.blocks_by_hash.sync();
// self.blocks_by_number.sync();
// state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?;
// state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?;
// state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?;
// state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?;
state.end()
}
}
@ -1391,8 +1406,8 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (block_sender, _block_receiver) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1643,8 +1658,8 @@ mod tests {
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (block_sender, _) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1807,8 +1822,8 @@ mod tests {
),
]);
let (block_sender, _) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);

@ -98,8 +98,8 @@ impl Web3Rpc {
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
redis_pool: Option<RedisPool>,
block_map: BlocksByHashCache,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
tx_id_sender: Option<kanal::AsyncSender<(TxHash, Arc<Self>)>>,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
let created_at = Instant::now();
@ -389,7 +389,7 @@ impl Web3Rpc {
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_connect(
self: &Arc<Self>,
block_sender: Option<&kanal::AsyncSender<BlockAndRpc>>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
delay_start: bool,
@ -452,7 +452,7 @@ impl Web3Rpc {
/// connect to the web3 provider
async fn connect(
self: &Arc<Self>,
block_sender: Option<&kanal::AsyncSender<BlockAndRpc>>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
@ -474,7 +474,7 @@ impl Web3Rpc {
// tell the block subscriber that this rpc doesn't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send((None, self.clone()))
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
@ -589,7 +589,7 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &kanal::AsyncSender<BlockAndRpc>,
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
let new_head_block = match new_head_block {
@ -652,7 +652,7 @@ impl Web3Rpc {
// send an empty block to take this server out of rotation
block_sender
.send((new_head_block, self.clone()))
.send_async((new_head_block, self.clone()))
.await
.context("block_sender")?;
@ -671,11 +671,11 @@ impl Web3Rpc {
self: Arc<Self>,
authorization: &Arc<Authorization>,
block_map: BlocksByHashCache,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
disconnect_receiver: watch::Receiver<bool>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
tx_id_sender: Option<kanal::AsyncSender<(TxHash, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
let error_handler = if self.backup {
RequestErrorHandler::DebugLevel
@ -896,7 +896,7 @@ impl Web3Rpc {
self: Arc<Self>,
authorization: Arc<Authorization>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: kanal::AsyncSender<BlockAndRpc>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
@ -1091,7 +1091,7 @@ impl Web3Rpc {
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: kanal::AsyncSender<(TxHash, Arc<Self>)>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
@ -1116,7 +1116,7 @@ impl Web3Rpc {
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send((pending_tx_id, self.clone()))
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
@ -1397,6 +1397,16 @@ impl Serialize for Web3Rpc {
state.serialize_field("head_block", &head_block)?;
}
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Acquire),
)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field("head_latency", &self.head_latency.read().value())?;
state.serialize_field(
@ -1406,16 +1416,6 @@ impl Serialize for Web3Rpc {
state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Acquire),
)?;
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Acquire),
)?;
state.end()
}
}

@ -7,7 +7,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
use ethers::types::{Address, Bytes};
use log::{debug, error, info, trace, warn, Level};
use log::{debug, error, trace, warn, Level};
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
@ -283,12 +283,19 @@ impl OpenRequestHandle {
_ => err.as_error_response().map(|x| x.message.clone()),
};
trace!("error message: {:?}", msg);
if let Some(msg) = msg {
if msg.starts_with("execution reverted") {
trace!("revert from {}", self.rpc);
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
trace!("rate limit from {}", self.rpc);
// TODO: too verbose
if self.rpc.backup {
trace!("rate limit from {}", self.rpc);
} else {
warn!("rate limit from {}", self.rpc);
}
ResponseTypes::RateLimit
} else {
ResponseTypes::Error
@ -303,6 +310,15 @@ impl OpenRequestHandle {
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
// TODO: how long should we actually wait? different providers have different times
// TODO: if rate_limit_period_seconds is set, use that
// TODO: check response headers for rate limits too
// TODO: warn if production, debug if backup
if self.rpc.backup {
debug!("unexpected rate limit on {}!", self.rpc);
} else {
warn!("unexpected rate limit on {}!", self.rpc);
}
let retry_at = Instant::now() + Duration::from_secs(1);
trace!("retry {} at: {:?}", self.rpc, retry_at);
@ -374,8 +390,9 @@ impl OpenRequestHandle {
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
trace!("updating peak_latency: {}", latency.as_secs_f64());
peak_latency.report(latency);
// trace!("updating peak_latency: {}", latency.as_secs_f64());
// peak_latency.report(latency);
trace!("peak latency disabled for now");
} else {
unreachable!("peak_latency not initialized");
}

@ -672,6 +672,7 @@ impl RpcQueryStats {
method: Option<&str>,
) -> Decimal {
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
if ["eth_chainId"].contains(method) {
return 0.into();

@ -8,7 +8,7 @@ use log::{error, info, trace};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::interval;
@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats {
#[derive(From)]
pub struct SpawnedStatBuffer {
pub stat_sender: mpsc::UnboundedSender<AppStat>,
pub stat_sender: flume::Sender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
}
@ -65,7 +65,7 @@ impl StatBuffer {
return Ok(None);
}
let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
let (stat_sender, stat_receiver) = flume::unbounded();
let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self {
@ -94,7 +94,7 @@ impl StatBuffer {
async fn aggregate_and_save_loop(
&mut self,
bucket: String,
mut stat_receiver: mpsc::UnboundedReceiver<AppStat>,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
let mut tsdb_save_interval =
@ -107,11 +107,11 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv() => {
// info!("Received stat");
stat = stat_receiver.recv_async() => {
// trace!("Received stat");
// save the stat to a buffer
match stat {
Some(AppStat::RpcQuery(stat)) => {
Ok(AppStat::RpcQuery(stat)) => {
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
@ -128,8 +128,8 @@ impl StatBuffer {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat);
}
}
None => {
info!("done receiving stats");
Err(err) => {
info!("error receiving stat: {}", err);
break;
}
}