Merge branch 'main' into 19-admin-imitate

This commit is contained in:
David 2023-02-17 14:33:43 +01:00 committed by GitHub
commit 2c8c4306fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 2520 additions and 1674 deletions

48
Cargo.lock generated

@ -282,13 +282,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.4"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5694b64066a2459918d8074c2ce0d5a88f409431994c2356617c8ae0c4721fc"
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
dependencies = [
"async-trait",
"axum-core",
"base64 0.20.0",
"base64 0.21.0",
"bitflags",
"bytes",
"futures-util",
@ -347,9 +347,9 @@ dependencies = [
[[package]]
name = "axum-macros"
version = "0.3.2"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dbcf61bed07d554bd5c225cd07bc41b793eab63e79c6f0ceac7e1aed2f1c670"
checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841"
dependencies = [
"heck 0.4.0",
"proc-macro2",
@ -419,12 +419,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.0"
@ -1809,6 +1803,12 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "ewma"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f20267f3a8b678b7151c0c508002e79126144a5d47badddec7f31ddc1f4c754"
[[package]]
name = "eyre"
version = "0.6.8"
@ -2891,9 +2891,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.9.7"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b9268097a2cf211ac9955b1cc95e80fa84fff5c2d13ba292916445dc8a311f"
checksum = "2b6446f16d504e3d575df79cabb11bfbe9f24b17e9562d964a815db7b28ae3ec"
dependencies = [
"async-io",
"async-lock",
@ -3093,9 +3093,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.17.0"
version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "opaque-debug"
@ -3134,6 +3134,15 @@ dependencies = [
"syn",
]
[[package]]
name = "ordered-float"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d84eb1409416d254e4a9c8fa56cc24701755025b458f0fcd8e59e1f5f40c23bf"
dependencies = [
"num-traits",
]
[[package]]
name = "os_info"
version = "3.6.0"
@ -4520,9 +4529,9 @@ dependencies = [
[[package]]
name = "serde_prometheus"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb6048d9e4ebc41f7d1a42c79b04c5b460633be307620a0e34a8f81970ea47"
checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f"
dependencies = [
"heapless",
"nom",
@ -5761,7 +5770,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.13.0"
version = "0.13.1"
dependencies = [
"anyhow",
"argh",
@ -5776,6 +5785,7 @@ dependencies = [
"entities",
"env_logger",
"ethers",
"ewma",
"fdlimit",
"flume",
"futures",
@ -5793,6 +5803,8 @@ dependencies = [
"notify",
"num",
"num-traits",
"once_cell",
"ordered-float",
"pagerduty-rs",
"parking_lot 0.12.1",
"prettytable",

11
Jenkinsfile vendored

@ -1,19 +1,20 @@
def buildAndPush() {
// env.ARCH is the system architecture. some apps can be generic (amd64, arm64),
// but apps that compile for specific hardware (like web3-proxy) will need more specific tags (amd64_epyc2, arm64_graviton2, intel_xeon3, etc.)
// env.BRANCH_NAME is set to the git branch name by default
// env.REGISTRY is the repository url for this pipeline
// env.GIT_SHORT is the git short hash of the currently checked out repo
// env.LATEST_BRANCH is the branch name that gets tagged latest
// env.ARCH is the system architecture. some apps can be generic (amd64, arm64),
// but apps that compile for specific hardware (like web3-proxy) will need more specific tags (amd64_epyc2, arm64_graviton2, intel_xeon3, etc.)
// TODO: check that this system actually matches the given arch
sh '''#!/bin/bash
set -eux -o pipefail
[ -n "$GIT_SHORT" ]
[ -n "$GIT_SHORT" ]
[ -n "$REGISTRY" ]
[ -n "$ARCH" ]
[ -n "$BRANCH_NAME" ]
[ -n "$REGISTRY" ]
[ -n "$GIT_SHORT" ]
[ -n "$LATEST_BRANCH" ]
# deterministic mtime on .git keeps Dockerfiles that do 'ADD . .' or similar
# without this, the build process always thinks the directory has changes

12
TODO.md

@ -330,6 +330,11 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] block all admin_ rpc commands
- [x] remove the "metered" crate now that we save aggregate queries?
- [x] add archive depth to app config
- [x] use from_block and to_block so that eth_getLogs is routed correctly
- [x] improve eth_sendRawTransaction server selection
- [x] don't cache methods that are usually very large
- [x] use http provider when available
- [ ] don't use new_head_provider anywhere except new head subscription
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
@ -339,6 +344,12 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [-] add configurable size limits to all the Caches
- instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- https://github.com/moka-rs/moka/issues/201
- [ ] have multiple providers on each backend rpc. one websocket for newHeads. and then http providers for handling requests
- erigon only streams the JSON over HTTP. that code isn't enabled for websockets. so this should save memory on the erigon servers
- i think this also means we don't need to worry about changing the id that the user gives us.
- have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code)
- [ ] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly
- change the send_best function to only include servers that are at least close to fully synced
- [ ] have private transactions be enabled by a url setting rather than a setting on the key
- [ ] cli for adding rpc keys to an existing user
- [ ] rate limiting/throttling on query_user_stats
@ -349,6 +360,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] query_user_stats cache hit rate
- [ ] need debounce on reconnect. websockets are closing on us and then we reconnect twice. locks on ProviderState need more thought
- [ ] having the whole block in status is very verbose. trim it down
- [ ] `cost estimate` script
- sum bytes and number of requests. prompt hosting costs. divide

@ -52,50 +52,50 @@ response_cache_max_bytes = 10_000_000_000
[balanced_rpcs.ankr]
display_name = "Ankr"
url = "https://rpc.ankr.com/eth"
http_url = "https://rpc.ankr.com/eth"
soft_limit = 1_000
tier = 0
[balanced_rpcs.cloudflare]
display_name = "Cloudflare"
url = "https://cloudflare-eth.com"
http_url = "https://cloudflare-eth.com"
soft_limit = 1_000
tier = 1
[balanced_rpcs.blastapi]
display_name = "Blast"
url = "https://eth-mainnet.public.blastapi.io"
http_url = "https://eth-mainnet.public.blastapi.io"
soft_limit = 1_000
tier = 1
[balanced_rpcs.mycryptoapi]
display_name = "MyCrypto"
disabled = true
url = "https://api.mycryptoapi.com/eth"
http_url = "https://api.mycryptoapi.com/eth"
soft_limit = 1_000
tier = 2
[balanced_rpcs.pokt-v1]
display_name = "Pokt #1"
url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
http_url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
soft_limit = 500
tier = 2
[balanced_rpcs.pokt]
display_name = "Pokt #2"
url = "https://eth-rpc.gateway.pokt.network"
http_url = "https://eth-rpc.gateway.pokt.network"
soft_limit = 500
tier = 3
[balanced_rpcs.linkpool]
display_name = "Linkpool"
url = "https://main-rpc.linkpool.io"
http_url = "https://main-rpc.linkpool.io"
soft_limit = 500
tier = 4
[balanced_rpcs.runonflux]
display_name = "Run on Flux (light)"
url = "https://ethereumnodelight.app.runonflux.io"
http_url = "https://ethereumnodelight.app.runonflux.io"
soft_limit = 1_000
tier = 5
@ -103,7 +103,7 @@ response_cache_max_bytes = 10_000_000_000
[balanced_rpcs.linkpool-light]
display_name = "Linkpool (light)"
disabled = true
url = "https://main-light.eth.linkpool.io"
http_url = "https://main-light.eth.linkpool.io"
soft_limit = 100
tier = 5
@ -114,34 +114,34 @@ response_cache_max_bytes = 10_000_000_000
[private_rpcs.eden]
disabled = true
display_name = "Eden network"
url = "https://api.edennetwork.io/v1/"
http_url = "https://api.edennetwork.io/v1/"
soft_limit = 1_805
tier = 0
[private_rpcs.eden_beta]
disabled = true
display_name = "Eden network beta"
url = "https://api.edennetwork.io/v1/beta"
http_url = "https://api.edennetwork.io/v1/beta"
soft_limit = 5_861
tier = 0
[private_rpcs.ethermine]
disabled = true
display_name = "Ethermine"
url = "https://rpc.ethermine.org"
http_url = "https://rpc.ethermine.org"
soft_limit = 5_861
tier = 0
[private_rpcs.flashbots]
disabled = true
display_name = "Flashbots Fast"
url = "https://rpc.flashbots.net/fast"
http_url = "https://rpc.flashbots.net/fast"
soft_limit = 7_074
tier = 0
[private_rpcs.securerpc]
disabled = true
display_name = "SecureRPC"
url = "https://gibson.securerpc.com/v1"
http_url = "https://gibson.securerpc.com/v1"
soft_limit = 4_560
tier = 0

@ -16,17 +16,26 @@ response_cache_max_bytes = 1_000_000_000
[balanced_rpcs]
[balanced_rpcs.llama_public_wss]
[balanced_rpcs.llama_public_both]
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
disabled = false
display_name = "LlamaNodes WSS"
url = "wss://eth.llamarpc.com/"
display_name = "LlamaNodes Both"
ws_url = "wss://eth.llamarpc.com/"
http_url = "https://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0
[balanced_rpcs.llama_public_https]
disabled = false
display_name = "LlamaNodes HTTPS"
url = "https://eth.llamarpc.com/"
http_url = "https://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0
[balanced_rpcs.llama_public_wss]
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
disabled = false
display_name = "LlamaNodes WSS"
ws_url = "wss://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0

@ -10,5 +10,5 @@ redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.69"
hashbrown = "0.13.2"
log = "0.4.17"
moka = { version = "0.9.7", default-features = false, features = ["future"] }
moka = { version = "0.10.0", default-features = false, features = ["future"] }
tokio = "1.25.0"

10
docs/curl login.md Normal file

@ -0,0 +1,10 @@
# log in with curl
1. curl http://127.0.0.1:8544/user/login/$ADDRESS
2. Sign the text with a site like https://www.myetherwallet.com/wallet/sign
3. POST the signed data:
curl -X POST http://127.0.0.1:8544/user/login -H 'Content-Type: application/json' -d
'{ "address": "0x9eb9e3dc2543dc9ff4058e2a2da43a855403f1fd", "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078396562396533646332353433646339464634303538653241324441343341383535343033463166440a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a203031474d37373330375344324448333854454d3957545156454a0a4973737565642041743a20323032322d31322d31345430323a32333a31372e3735333736335a0a45787069726174696f6e2054696d653a20323032322d31322d31345430323a34333a31372e3735333736335a", "sig": "16bac055345279723193737c6c67cf995e821fd7c038d31fd6f671102088c7b85ab4b13069fd2ed02da186cf549530e315d8d042d721bf81289b3ffdbe8cf9ce1c", "version": "3", "signer": "MEW" }'
4. The response will include a bearer token. Use it with curl ... -H 'Authorization: Bearer $TOKEN'

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.13.0"
version = "0.13.1"
edition = "2021"
default-run = "web3_proxy_cli"
@ -27,9 +27,9 @@ thread-fast-rng = { path = "../thread-fast-rng" }
anyhow = { version = "1.0.69", features = ["backtrace"] }
argh = "0.1.10"
axum = { version = "0.6.4", features = ["headers", "ws"] }
axum = { version = "0.6.6", features = ["headers", "ws"] }
axum-client-ip = "0.4.0"
axum-macros = "0.3.2"
axum-macros = "0.3.4"
chrono = "0.4.23"
counter = "0.5.7"
derive_more = "0.99.17"
@ -48,10 +48,11 @@ http = "0.2.8"
ipnet = "2.7.1"
itertools = "0.10.5"
log = "0.4.17"
moka = { version = "0.9.7", default-features = false, features = ["future"] }
moka = { version = "0.10.0", default-features = false, features = ["future"] }
notify = "5.1.0"
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.17.1" }
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
@ -62,7 +63,7 @@ rustc-hash = "1.1.0"
sentry = { version = "0.29.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.152", features = [] }
serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.2.0"
serde_prometheus = "0.2.1"
siwe = "0.5.0"
time = "0.3.17"
tokio = { version = "1.25.0", features = ["full"] }
@ -73,3 +74,5 @@ tower-http = { version = "0.3.5", features = ["cors", "sensitive-headers"] }
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.3.0"
ewma = "0.1.1"
ordered-float = "3.4.0"

@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::transactions::TxStatus;
@ -23,7 +23,7 @@ use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
use entities::user;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
@ -69,9 +69,9 @@ pub static REQUEST_PERIOD: u64 = 60;
#[derive(From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
from_block: Option<SavedBlock>,
from_block: Option<Web3ProxyBlock>,
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
to_block: Option<SavedBlock>,
to_block: Option<Web3ProxyBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
@ -204,7 +204,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
watch_consensus_head_receiver: watch::Receiver<ArcBlock>,
watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
@ -482,7 +482,7 @@ impl Web3ProxyApp {
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.timeout(Duration::from_secs(5 * 60))
.user_agent(APP_USER_AGENT)
.build()?,
);
@ -541,8 +541,7 @@ impl Web3ProxyApp {
};
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) =
watch::channel(Arc::new(Block::default()));
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -557,33 +556,40 @@ impl Web3ProxyApp {
// TODO: ttl on this? or is max_capacity fine?
let pending_transactions = Cache::builder()
.max_capacity(10_000)
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// keep 1GB of blocks in the cache
// keep 1GB/5 minutes of blocks in the cache
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better?
let block_map = Cache::builder()
let block_map: BlockHashesCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &ArcBlock| {
.weigher(|_k, v: &Web3ProxyBlock| {
// TODO: is this good enough?
1 + v.transactions.len().try_into().unwrap_or(u32::MAX)
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
block_map.clone(),
top_config.app.chain_id,
db_conn.clone(),
balanced_rpcs,
http_client.clone(),
vredis_pool.clone(),
block_map.clone(),
Some(watch_consensus_head_sender),
top_config.app.min_sum_soft_limit,
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
Some(pending_tx_sender.clone()),
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
vredis_pool.clone(),
balanced_rpcs,
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
@ -599,26 +605,30 @@ impl Web3ProxyApp {
None
} else {
let (private_rpcs, private_handle) = Web3Rpcs::spawn(
block_map,
top_config.app.chain_id,
db_conn.clone(),
private_rpcs,
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
vredis_pool.clone(),
block_map,
private_rpcs,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
0,
0,
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
pending_transactions.clone(),
)
.await
.context("spawning private_rpcs")?;
if private_rpcs.conns.is_empty() {
if private_rpcs.by_name.is_empty() {
None
} else {
// save the handle to catch any errors
@ -685,6 +695,8 @@ impl Web3ProxyApp {
u32::MAX
}
})
// TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(600))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all the users are the same size, so no need for a weigher
@ -734,7 +746,7 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
pub fn head_block_receiver(&self) -> watch::Receiver<ArcBlock> {
pub fn head_block_receiver(&self) -> watch::Receiver<Option<Web3ProxyBlock>> {
self.watch_consensus_head_receiver.clone()
}
@ -932,7 +944,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, request, proxy_mode),
self.proxy_cached_request(&authorization, request, proxy_mode, None),
)
.await??;
@ -965,10 +977,26 @@ impl Web3ProxyApp {
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
// get the head block now so that any requests that need it all use the same block
// TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
.balanced_rpcs
.head_block_num()
.context(anyhow::anyhow!("no servers synced"))?;
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_cached_request(authorization, request, proxy_mode))
.map(|request| {
self.proxy_cached_request(
authorization,
request,
proxy_mode,
Some(head_block_num),
)
})
.collect::<Vec<_>>(),
)
.await;
@ -1017,6 +1045,7 @@ impl Web3ProxyApp {
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
proxy_mode: ProxyMode,
head_block_num: Option<U64>,
) -> Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
// trace!("Received request: {:?}", request);
@ -1035,9 +1064,17 @@ impl Web3ProxyApp {
| "db_getString"
| "db_putHex"
| "db_putString"
| "debug_accountRange"
| "debug_backtraceAt"
| "debug_blockProfile"
| "debug_chaindbCompact"
| "debug_chaindbProperty"
| "debug_cpuProfile"
| "debug_freeOSMemory"
| "debug_freezeClient"
| "debug_gcStats"
| "debug_goTrace"
| "debug_memStats"
| "debug_mutexProfile"
| "debug_setBlockProfileRate"
| "debug_setGCPercent"
@ -1125,7 +1162,7 @@ impl Web3ProxyApp {
serde_json::Value::Array(vec![])
}
"eth_blockNumber" => {
match self.balanced_rpcs.head_block_num() {
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => {
json!(head_block_num)
}
@ -1138,9 +1175,7 @@ impl Web3ProxyApp {
}
}
}
"eth_chainId" => {
json!(U64::from(self.config.chain_id))
}
"eth_chainId" => json!(U64::from(self.config.chain_id)),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
@ -1158,6 +1193,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
None,
None,
)
.await?;
@ -1193,7 +1229,7 @@ impl Web3ProxyApp {
}
"eth_mining" => {
// no stats on this. its cheap
json!(false)
serde_json::Value::Bool(false)
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
@ -1222,12 +1258,19 @@ impl Web3ProxyApp {
(&self.balanced_rpcs, default_num)
};
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or_else(|| anyhow::anyhow!("no servers synced"))?;
// TODO: error/wait if no head block!
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
.try_send_all_synced_connections(
authorization,
&request,
Some(request_metadata.clone()),
Some(&head_block_num),
None,
Level::Trace,
num,
@ -1318,7 +1361,7 @@ impl Web3ProxyApp {
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
json!(false)
serde_json::Value::Bool(false)
}
"eth_subscribe" => {
return Ok((
@ -1343,12 +1386,12 @@ impl Web3ProxyApp {
"net_listening" => {
// no stats on this. its cheap
// TODO: only if there are some backends on balanced_rpcs?
json!(true)
serde_json::Value::Bool(true)
}
"net_peerCount" => {
// no stats on this. its cheap
// TODO: do something with proxy_mode here?
self.balanced_rpcs.num_synced_rpcs().into()
json!(U64::from(self.balanced_rpcs.num_synced_rpcs()))
}
"web3_clientVersion" => {
// no stats on this. its cheap
@ -1422,9 +1465,8 @@ impl Web3ProxyApp {
// emit stats
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
let head_block_num = self
.balanced_rpcs
.head_block_num()
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.context("no servers synced")?;
// we do this check before checking caches because it might modify the request params
@ -1468,7 +1510,7 @@ impl Web3ProxyApp {
.await?;
Some(ResponseCacheKey {
from_block: Some(SavedBlock::new(request_block)),
from_block: Some(request_block),
to_block: None,
method: method.to_string(),
// TODO: hash here?
@ -1508,8 +1550,8 @@ impl Web3ProxyApp {
.await?;
Some(ResponseCacheKey {
from_block: Some(SavedBlock::new(from_block)),
to_block: Some(SavedBlock::new(to_block)),
from_block: Some(from_block),
to_block: Some(to_block),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
@ -1524,7 +1566,8 @@ impl Web3ProxyApp {
let authorization = authorization.clone();
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number());
let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number());
self.response_cache
.try_get_with(cache_key, async move {
@ -1537,6 +1580,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
.await?;
@ -1545,7 +1589,7 @@ impl Web3ProxyApp {
// TODO: only cache the inner response
// TODO: how are we going to stream this?
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us?
Ok::<_, anyhow::Error>(response)
})
.await
@ -1565,6 +1609,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
None,
None,
)
.await?
}

@ -61,6 +61,12 @@ impl Web3ProxyApp {
);
while let Some(new_head) = head_block_receiver.next().await {
let new_head = if let Some(new_head) = new_head {
new_head
} else {
continue;
};
// TODO: what should the payload for RequestMetadata be?
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
@ -72,7 +78,7 @@ impl Web3ProxyApp {
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.as_ref(),
"result": new_head.block,
},
});

22
web3_proxy/src/atomics.rs Normal file

@ -0,0 +1,22 @@
use std::sync::atomic::{AtomicU64, Ordering};
pub struct AtomicF64 {
storage: AtomicU64,
}
impl AtomicF64 {
pub fn new(value: f64) -> Self {
let as_u64 = value.to_bits();
Self {
storage: AtomicU64::new(as_u64),
}
}
pub fn store(&self, value: f64, ordering: Ordering) {
let as_u64 = value.to_bits();
self.storage.store(as_u64, ordering)
}
pub fn load(&self, ordering: Ordering) -> f64 {
let as_u64 = self.storage.load(ordering);
f64::from_bits(as_u64)
}
}

@ -64,7 +64,7 @@ async fn run(
));
// wait until the app has seen its first consensus head block
// TODO: if backups were included, wait a little longer
// TODO: if backups were included, wait a little longer?
let _ = spawned_app.app.head_block_receiver().changed().await;
// start the frontend port
@ -205,31 +205,27 @@ mod tests {
(
"anvil".to_string(),
Web3RpcConfig {
disabled: false,
display_name: None,
url: anvil.endpoint(),
backup: Some(false),
block_data_limit: None,
http_url: Some(anvil.endpoint()),
soft_limit: 100,
hard_limit: None,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
..Default::default()
},
),
(
"anvil_ws".to_string(),
Web3RpcConfig {
disabled: false,
display_name: None,
url: anvil.ws_endpoint(),
backup: Some(false),
block_data_limit: None,
ws_url: Some(anvil.ws_endpoint()),
soft_limit: 100,
hard_limit: None,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
..Default::default()
},
),
(
"anvil_both".to_string(),
Web3RpcConfig {
http_url: Some(anvil.endpoint()),
ws_url: Some(anvil.ws_endpoint()),
..Default::default()
},
),
]),

@ -80,12 +80,7 @@ pub async fn clean_block_number(
.context("fetching block number from hash")?;
// TODO: set change to true? i think not we should probably use hashes for everything.
(
block
.number
.expect("blocks here should always have numbers"),
false,
)
(*block.number(), false)
} else {
return Err(anyhow::anyhow!("blockHash missing"));
}
@ -132,6 +127,12 @@ pub async fn block_needed(
head_block_num: U64,
rpcs: &Web3Rpcs,
) -> anyhow::Result<BlockNeeded> {
// some requests have potentially very large responses
// TODO: only skip caching if the response actually is large
if method.starts_with("trace_") || method == "debug_traceTransaction" {
return Ok(BlockNeeded::CacheNever);
}
let params = if let Some(params) = params {
// grab the params so we can inspect and potentially modify them
params
@ -215,8 +216,8 @@ pub async fn block_needed(
};
return Ok(BlockNeeded::CacheRange {
from_block_num: from_block_num,
to_block_num: to_block_num,
from_block_num,
to_block_num,
cache_errors: true,
});
}

@ -1,9 +1,9 @@
use crate::rpcs::blockchain::BlockHashesCache;
use crate::app::AnyhowJoinHandle;
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
use argh::FromArgs;
use ethers::prelude::TxHash;
use ethers::types::U256;
use ethers::types::{U256, U64};
use hashbrown::HashMap;
use log::warn;
use migration::sea_orm::DatabaseConnection;
@ -11,7 +11,7 @@ use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Rpc>);
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
#[derive(Debug, FromArgs)]
@ -105,6 +105,12 @@ pub struct AppConfig {
pub invite_code: Option<String>,
pub login_domain: Option<String>,
/// do not serve any requests if the best known block is older than this many seconds.
pub max_block_age: Option<u64>,
/// do not serve any requests if the best known block is behind the best known block by more than this many blocks.
pub max_block_lag: Option<U64>,
/// Rate limit for bearer token authenticated entrypoints.
/// This is separate from the rpc limits.
#[serde(default = "default_bearer_token_max_concurrent_requests")]
@ -197,15 +203,19 @@ fn default_response_cache_max_bytes() -> u64 {
}
/// Configuration for a backend web3 RPC server
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct Web3RpcConfig {
/// simple way to disable a connection without deleting the row
#[serde(default)]
pub disabled: bool,
/// a name used in /status and other user facing messages
pub display_name: Option<String>,
/// websocket (or http if no websocket)
pub url: String,
/// (deprecated) rpc url
pub url: Option<String>,
/// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks
pub ws_url: Option<String>,
/// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON
pub http_url: Option<String>,
/// block data limit. If None, will be queried
pub block_data_limit: Option<u64>,
/// the requests per second at which the server starts slowing down
@ -213,14 +223,15 @@ pub struct Web3RpcConfig {
/// the requests per second at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
pub backup: Option<bool>,
#[serde(default)]
pub backup: bool,
/// All else equal, a server with a lower tier receives all requests
#[serde(default = "default_tier")]
pub tier: u64,
/// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs
#[serde(default)]
pub subscribe_txs: Option<bool>,
pub subscribe_txs: bool,
/// unknown config options get put here
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
@ -245,47 +256,24 @@ impl Web3RpcConfig {
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
}
let hard_limit = match (self.hard_limit, redis_pool) {
(None, None) => None,
(Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)),
(None, Some(_)) => None,
(Some(_hard_limit), None) => {
return Err(anyhow::anyhow!(
"no redis client pool! needed for hard limit"
))
}
};
let tx_id_sender = if self.subscribe_txs.unwrap_or(false) {
tx_id_sender
} else {
None
};
let backup = self.backup.unwrap_or(false);
Web3Rpc::spawn(
self,
name,
self.display_name,
chain_id,
db_conn,
self.url,
http_client,
http_interval_sender,
hard_limit,
self.soft_limit,
backup,
self.block_data_limit,
redis_pool,
block_map,
block_sender,
tx_id_sender,
true,
self.tier,
reconnect,
)
.await
}

@ -1,6 +1,7 @@
pub mod app;
pub mod app_stats;
pub mod admin_queries;
pub mod atomics;
pub mod block_number;
pub mod config;
pub mod frontend;

397
web3_proxy/src/peak_ewma.rs Normal file

@ -0,0 +1,397 @@
//! Code from [tower](https://github.com/tower-rs/tower/blob/3f31ffd2cf15f1e905142e5f43ab39ac995c22ed/tower/src/load/peak_ewma.rs)
//! Measures load using the PeakEWMA response latency.
//! TODO: refactor to work with our code
use std::task::{Context, Poll};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::Instant;
use tower_service::Service;
use tracing::trace;
/// Measures the load of the underlying service using Peak-EWMA load measurement.
///
/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of
/// pending work to an endpoint. Work is calculated by multiplying the
/// exponentially-weighted moving average (EWMA) of response latencies by the number of
/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to
/// worst-case latencies. Over time, the peak latency value decays towards the moving
/// average of latencies to the endpoint.
///
/// When no latency information has been measured for an endpoint, an arbitrary default
/// RTT of 1 second is used to prevent the endpoint from being overloaded before a
/// meaningful baseline can be established..
///
/// ## Note
///
/// This is derived from [Finagle][finagle], which is distributed under the Apache V2
/// license. Copyright 2017, Twitter Inc.
///
/// [finagle]:
/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
#[derive(Debug)]
pub struct PeakEwma<S, C = CompleteOnResponse> {
service: S,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
completion: C,
}
#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
completion: C,
}
}
/// Represents the relative cost of communicating with a service.
///
/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA
/// latency estimate multiplied by the number of pending requests.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
#[derive(Debug)]
pub struct Handle {
sent_at: Instant,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
}
/// Holds the current RTT estimate and the last time this value was updated.
#[derive(Debug)]
struct RttEstimate {
update_at: Instant,
rtt_ns: f64,
}
const NANOS_PER_MILLI: f64 = 1_000_000.0;
// ===== impl PeakEwma =====
impl<S, C> PeakEwma<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency.
pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
Self {
service,
decay_ns,
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
completion,
}
}
fn handle(&self) -> Handle {
Handle {
decay_ns: self.decay_ns,
sent_at: Instant::now(),
rtt_estimate: self.rtt_estimate.clone(),
}
}
}
impl<S, C, Request> Service<Request> for PeakEwma<S, C>
where
S: Service<Request>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = C::Output;
type Error = S::Error;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
}
}
impl<S, C> Load for PeakEwma<S, C> {
type Metric = Cost;
fn load(&self) -> Self::Metric {
let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1;
// Update the RTT estimate to account for decay since the last update.
// If an estimate has not been established, a default is provided
let estimate = self.update_estimate();
let cost = Cost(estimate * f64::from(pending + 1));
trace!(
"load estimate={:.0}ms pending={} cost={:?}",
estimate / NANOS_PER_MILLI,
pending,
cost,
);
cost
}
}
impl<S, C> PeakEwma<S, C> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
rtt.decay(self.decay_ns)
}
}
// ===== impl PeakEwmaDiscover =====
#[cfg(feature = "discover")]
impl<D, C> PeakEwmaDiscover<D, C> {
/// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
/// added services.
///
/// They `decay` value determines over what time period a RTT estimate should
/// decay.
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
completion,
}
}
}
#[cfg(feature = "discover")]
impl<D, C> Stream for PeakEwmaDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
*this.decay_ns,
this.completion.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Poll::Ready(Some(Ok(change)))
}
}
// ===== impl RttEstimate =====
impl RttEstimate {
fn new(rtt_ns: f64) -> Self {
debug_assert!(0.0 < rtt_ns, "rtt must be positive");
Self {
rtt_ns,
update_at: Instant::now(),
}
}
/// Decays the RTT estimate with a decay period of `decay_ns`.
fn decay(&mut self, decay_ns: f64) -> f64 {
// Updates with a 0 duration so that the estimate decays towards 0.
let now = Instant::now();
self.update(now, now, decay_ns)
}
/// Updates the Peak-EWMA RTT estimate.
///
/// The elapsed time from `sent_at` to `recv_at` is added
fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 {
debug_assert!(
sent_at <= recv_at,
"recv_at={:?} after sent_at={:?}",
recv_at,
sent_at
);
let rtt = nanos(recv_at.saturating_duration_since(sent_at));
let now = Instant::now();
debug_assert!(
self.update_at <= now,
"update_at={:?} in the future",
self.update_at
);
self.rtt_ns = if self.rtt_ns < rtt {
// For Peak-EWMA, always use the worst-case (peak) value as the estimate for
// subsequent requests.
trace!(
"update peak rtt={}ms prior={}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns / NANOS_PER_MILLI,
);
rtt
} else {
// When an RTT is observed that is less than the estimated RTT, we decay the
// prior estimate according to how much time has elapsed since the last
// update. The inverse of the decay is used to scale the estimate towards the
// observed RTT value.
let elapsed = nanos(now.saturating_duration_since(self.update_at));
let decay = (-elapsed / decay_ns).exp();
let recency = 1.0 - decay;
let next_estimate = (self.rtt_ns * decay) + (rtt * recency);
trace!(
"update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns - next_estimate,
next_estimate / NANOS_PER_MILLI,
);
next_estimate
};
self.update_at = now;
self.rtt_ns
}
}
// ===== impl Handle =====
impl Drop for Handle {
fn drop(&mut self) {
let recv_at = Instant::now();
if let Ok(mut rtt) = self.rtt_estimate.lock() {
rtt.update(self.sent_at, recv_at, self.decay_ns);
}
}
}
// ===== impl Cost =====
// Utility that converts durations to nanos in f64.
//
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,
// which, I hope, is more than enough to represent request latencies.
fn nanos(d: Duration) -> f64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;
let n = f64::from(d.subsec_nanos());
let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64;
n + s
}
#[cfg(test)]
mod tests {
use futures_util::future;
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_ready, assert_ready_ok, task};
use super::*;
struct Svc;
impl Service<()> for Svc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
}
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test]
async fn default_decay() {
time::pause();
let svc = PeakEwma::new(
Svc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
let Cost(load) = svc.load();
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI);
}
// The default RTT estimate decays, so that new nodes are considered if the default RTT is too
// high.
#[tokio::test]
async fn compound_decay() {
time::pause();
let mut svc = PeakEwma::new(
Svc,
Duration::from_millis(20),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp0 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp1 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp0.poll());
assert_eq!(svc.load(), Cost(400_000_000.0));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp1.poll());
assert_eq!(svc.load(), Cost(200_000_000.0));
// Check that values decay as time elapses
time::advance(Duration::from_secs(1)).await;
assert!(svc.load() < Cost(100_000_000.0));
time::advance(Duration::from_secs(10)).await;
assert!(svc.load() < Cost(100_000.0));
}
#[test]
fn nanos() {
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
assert_eq!(
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
18446744074709553000.0
);
}
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,588 @@
use crate::frontend::authorization::Authorization;
use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use anyhow::Context;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(super) tier: u64,
pub(super) head_block: Option<Web3ProxyBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) rpcs: Vec<Arc<Web3Rpc>>,
pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(super) backups_needed: bool,
}
impl ConsensusWeb3Rpcs {
pub fn num_conns(&self) -> usize {
self.rpcs.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.rpcs.len())
.finish_non_exhaustive()
}
}
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_receiver
.as_ref()
.and_then(|x| x.borrow().clone())
}
// TODO: return a ref?
pub fn head_block_hash(&self) -> Option<H256> {
self.head_block().map(|x| *x.hash())
}
// TODO: return a ref?
pub fn head_block_num(&self) -> Option<U64> {
self.head_block().map(|x| *x.number())
}
pub fn synced(&self) -> bool {
!self.watch_consensus_rpcs_sender.borrow().rpcs.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_rpcs_sender.borrow().rpcs.len()
}
}
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
pub struct ConnectionsGroup {
rpc_name_to_block: HashMap<String, Web3ProxyBlock>,
// TODO: what if there are two blocks with the same number?
highest_block: Option<Web3ProxyBlock>,
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
first_seen: FirstSeenCache,
}
impl ConnectionsGroup {
pub fn new(first_seen: FirstSeenCache) -> Self {
Self {
rpc_name_to_block: Default::default(),
highest_block: Default::default(),
first_seen,
}
}
pub fn len(&self) -> usize {
self.rpc_name_to_block.len()
}
fn remove(&mut self, rpc_name: &str) -> Option<Web3ProxyBlock> {
if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) {
match self.highest_block.as_mut() {
None => {}
Some(current_highest_block) => {
if removed_block.hash() == current_highest_block.hash() {
for maybe_highest_block in self.rpc_name_to_block.values() {
if maybe_highest_block.number() > current_highest_block.number() {
*current_highest_block = maybe_highest_block.clone();
};
}
}
}
}
Some(removed_block)
} else {
None
}
}
async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_with(*block.hash(), async move { Instant::now() })
.await;
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
// calculate elapsed time before trying to lock.
let latency = first_seen.elapsed();
rpc.head_latency.write().record(latency);
// TODO: what about a reorg to the same height?
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
self.highest_block = Some(block.clone());
}
self.rpc_name_to_block.insert(rpc.name.clone(), block)
}
// // TODO: do this during insert/remove?
// pub(self) async fn highest_block(
// &self,
// authorization: &Arc<Authorization>,
// web3_rpcs: &Web3Rpcs,
// ) -> Option<ArcBlock> {
// let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
// let mut highest_block = None::<ArcBlock>;
// for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
// // don't waste time checking the same hash multiple times
// if checked_heads.contains(rpc_head_hash) {
// continue;
// }
// let rpc_block = match web3_rpcs
// .get_block_from_rpc(rpc_name, rpc_head_hash, authorization)
// .await
// {
// Ok(x) => x,
// Err(err) => {
// warn!(
// "failed getting block {} from {} while finding highest block number: {:?}",
// rpc_head_hash, rpc_name, err,
// );
// continue;
// }
// };
// checked_heads.insert(rpc_head_hash);
// // if this is the first block we've tried
// // or if this rpc's newest block has a higher number
// // we used to check total difficulty, but that isn't a thing anymore on ETH
// // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
// let highest_num = highest_block
// .as_ref()
// .map(|x| x.number.expect("blocks here should always have a number"));
// let rpc_num = rpc_block.as_ref().number;
// if rpc_num > highest_num {
// highest_block = Some(rpc_block);
// }
// }
// highest_block
// }
/// min_consensus_block_num keeps us from ever going backwards.
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
pub(self) async fn consensus_head_connections(
&self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
min_consensus_block_num: Option<U64>,
tier: &u64,
) -> anyhow::Result<ConsensusWeb3Rpcs> {
let mut maybe_head_block = match self.highest_block.clone() {
None => return Err(anyhow::anyhow!("no blocks known")),
Some(x) => x,
};
// TODO: take max_distance_consensus_to_highest as an argument?
// TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain?
let max_lag_consensus_to_highest =
if let Some(min_consensus_block_num) = min_consensus_block_num {
maybe_head_block
.number()
.saturating_add(1.into())
.saturating_sub(min_consensus_block_num)
.as_u64()
} else {
10
};
trace!(
"max_lag_consensus_to_highest: {}",
max_lag_consensus_to_highest
);
let num_known = self.rpc_name_to_block.len();
if num_known < web3_rpcs.min_head_rpcs {
return Err(anyhow::anyhow!(
"not enough rpcs connected: {}/{}",
num_known,
web3_rpcs.min_head_rpcs,
));
}
let mut primary_rpcs_voted: Option<Web3ProxyBlock> = None;
let mut backup_rpcs_voted: Option<Web3ProxyBlock> = None;
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
let mut primary_consensus_rpcs = HashSet::<&str>::new();
let mut backup_consensus_rpcs = HashSet::<&str>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut primary_sum_soft_limit: u32 = 0;
let mut backup_sum_soft_limit: u32 = 0;
// TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit?
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..max_lag_consensus_to_highest {
let maybe_head_hash = maybe_head_block.hash();
// find all rpcs with maybe_head_hash as their current head
for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() {
if rpc_head.hash() != maybe_head_hash {
// connection is not on the desired block
continue;
}
if backup_consensus_rpcs.contains(rpc_name.as_str()) {
// connection is on a later block in this same chain
continue;
}
if primary_consensus_rpcs.contains(rpc_name.as_str()) {
// connection is on a later block in this same chain
continue;
}
if let Some(rpc) = web3_rpcs.by_name.get(rpc_name.as_str()) {
if backup_rpcs_voted.is_some() {
// backups already voted for a head block. don't change it
} else {
backup_consensus_rpcs.insert(rpc_name);
backup_sum_soft_limit += rpc.soft_limit;
}
if !rpc.backup {
primary_consensus_rpcs.insert(rpc_name);
primary_sum_soft_limit += rpc.soft_limit;
}
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.by_name);
}
}
if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
&& primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
{
// we have enough servers with enough requests! yey!
primary_rpcs_voted = Some(maybe_head_block.clone());
break;
}
if backup_rpcs_voted.is_none()
&& backup_consensus_rpcs != primary_consensus_rpcs
&& backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
&& backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
{
// if we include backup servers, we have enough servers with high enough limits
backup_rpcs_voted = Some(maybe_head_block.clone());
}
// not enough rpcs on this block. check the parent block
match web3_rpcs
.block(authorization, &maybe_head_block.parent_hash(), None)
.await
{
Ok(parent_block) => {
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
// );
maybe_head_block = parent_block.into();
continue;
}
Err(err) => {
let soft_limit_percent = (primary_sum_soft_limit as f32
/ web3_rpcs.min_sum_soft_limit as f32)
* 100.0;
let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}",
primary_consensus_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
primary_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
err,
);
if backup_rpcs_voted.is_some() {
warn!("{}", err_msg);
break;
} else {
return Err(anyhow::anyhow!(err_msg));
}
}
}
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
// we've done all the searching for the heaviest block that we can
if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs
|| primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit)
&& backup_rpcs_voted.is_none()
{
// if we get here, not enough servers are synced. return an error
let soft_limit_percent =
(primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
return Err(anyhow::anyhow!(
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
primary_consensus_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
primary_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
));
}
// success! this block has enough soft limit and nodes on it (or on later blocks)
let rpcs: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned())
.collect();
#[cfg(debug_assertions)]
let _ = maybe_head_block.hash();
#[cfg(debug_assertions)]
let _ = maybe_head_block.number();
Ok(ConsensusWeb3Rpcs {
tier: *tier,
head_block: Some(maybe_head_block),
rpcs,
backups_voted: backup_rpcs_voted,
backups_needed: primary_rpcs_voted.is_none(),
})
}
}
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// backups for all tiers are only used if necessary
/// tiers[0] = only tier 0.
/// tiers[1] = tier 0 and tier 1
/// tiers[n] = tier 0..=n
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
tiers: BTreeMap<u64, ConnectionsGroup>,
/// never serve blocks that are too old
max_block_age: Option<u64>,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_block_lag: Option<U64>,
}
impl ConsensusFinder {
pub fn new(
configured_tiers: &[u64],
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
) -> Self {
// TODO: what's a good capacity for this?
let first_seen = Cache::builder()
.max_capacity(16)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: this will need some thought when config reloading is written
let tiers = configured_tiers
.iter()
.map(|x| (*x, ConnectionsGroup::new(first_seen.clone())))
.collect();
Self {
tiers,
max_block_age,
max_block_lag,
}
}
pub fn len(&self) -> usize {
self.tiers.len()
}
/// get the ConnectionsGroup that contains all rpcs
/// panics if there are no tiers
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
self.tiers.values().last()
}
/// get the mutable ConnectionsGroup that contains all rpcs
pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> {
self.tiers.values_mut().last()
}
pub fn remove(&mut self, rpc: &Web3Rpc) -> Option<Web3ProxyBlock> {
let mut removed = None;
for (i, tier_group) in self.tiers.iter_mut().rev() {
if i < &rpc.tier {
break;
}
let x = tier_group.remove(rpc.name.as_str());
if removed.is_none() && x.is_some() {
removed = x;
}
}
removed
}
/// returns the block that the rpc was on before updating to the new_block
pub async fn insert(
&mut self,
rpc: &Web3Rpc,
new_block: Web3ProxyBlock,
) -> Option<Web3ProxyBlock> {
let mut old = None;
// TODO: error if rpc.tier is not in self.tiers
for (i, tier_group) in self.tiers.iter_mut().rev() {
if i < &rpc.tier {
break;
}
// TODO: should new_block be a ref?
let x = tier_group.insert(rpc, new_block.clone()).await;
if old.is_none() && x.is_some() {
old = x;
}
}
old
}
/// Update our tracking of the rpc and return true if something changed
pub(crate) async fn update_rpc(
&mut self,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Rpcs,
) -> anyhow::Result<bool> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
.try_cache_block(rpc_head_block, false)
.await
.context("failed caching block")?;
// if let Some(max_block_lag) = max_block_lag {
// if rpc_head_block.number() < ??? {
// trace!("rpc_head_block from {} is too far behind! {}", rpc, rpc_head_block);
// return Ok(self.remove(&rpc).is_some());
// }
// }
if let Some(max_age) = self.max_block_age {
if rpc_head_block.age() > max_age {
trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
return Ok(self.remove(&rpc).is_some());
}
}
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
if prev_block.hash() == rpc_head_block.hash() {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
}
} else {
// first block for this rpc
true
}
}
None => {
if self.remove(&rpc).is_none() {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
}
};
Ok(changed)
}
pub async fn best_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Rpcs,
) -> anyhow::Result<ConsensusWeb3Rpcs> {
// TODO: attach context to these?
let highest_known_block = self
.all_rpcs_group()
.context("no rpcs")?
.highest_block
.as_ref()
.context("no highest block")?;
trace!("highest_known_block: {}", highest_known_block);
let min_block_num = self
.max_block_lag
.map(|x| highest_known_block.number().saturating_sub(x))
// we also want to be sure we don't ever go backwards!
.max(web3_connections.head_block_num());
trace!("min_block_num: {:#?}", min_block_num);
// TODO Should this be a Vec<Result<Option<_, _>>>?
// TODO: how should errors be handled?
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
for (tier, x) in self.tiers.iter() {
trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block);
if let Ok(consensus_head_connections) = x
.consensus_head_connections(authorization, web3_connections, min_block_num, tier)
.await
{
trace!("success on tier {}", tier);
// we got one! hopefully it didn't need to use any backups.
// but even if it did need backup servers, that is better than going to a worse tier
return Ok(consensus_head_connections);
}
}
return Err(anyhow::anyhow!("failed finding consensus on all tiers"));
}
}
#[cfg(test)]
mod test {
// #[test]
// fn test_simplest_case_consensus_head_connections() {
// todo!();
// }
}

File diff suppressed because it is too large Load Diff

@ -1,8 +1,8 @@
// TODO: all pub, or export useful things here instead?
pub mod blockchain;
pub mod consensus;
pub mod many;
pub mod one;
pub mod provider;
pub mod request;
pub mod synced_connections;
pub mod transactions;

File diff suppressed because it is too large Load Diff

@ -2,22 +2,45 @@ use anyhow::Context;
use derive_more::From;
use std::time::Duration;
// TODO: our own structs for these that handle streaming large responses
type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
// TODO: custom types that let us stream JSON responses
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
// TODO: only include this for tests.
Both(EthersHttpProvider, EthersWsProvider),
Http(EthersHttpProvider),
// TODO: deadpool? custom tokio-tungstenite
Ws(EthersWsProvider),
#[cfg(test)]
Mock,
}
impl Web3Provider {
pub fn ready(&self) -> bool {
match self {
Self::Mock => true,
Self::Both(_, ws) => ws.as_ref().ready(),
Self::Http(_) => true,
Self::Ws(provider) => provider.as_ref().ready(),
Self::Ws(ws) => ws.as_ref().ready(),
#[cfg(test)]
Self::Mock => true,
}
}
pub fn http(&self) -> Option<&EthersHttpProvider> {
match self {
Self::Http(x) => Some(x),
_ => None,
}
}
pub fn ws(&self) -> Option<&EthersWsProvider> {
match self {
Self::Both(_, x) | Self::Ws(x) => Some(x),
_ => None,
}
}

@ -1,6 +1,6 @@
use super::one::Web3Rpc;
use super::provider::Web3Provider;
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use chrono::Utc;
use entities::revert_log;
@ -11,7 +11,6 @@ use log::{debug, error, trace, warn, Level};
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
@ -21,20 +20,20 @@ pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request because the server is not synced
/// contains "true" if backup servers were attempted
NotReady(bool),
/// Unable to start a request because no servers are synced
NotReady,
}
/// Make RPC requests through this handle and drop it when you are done.
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
conn: Arc<Web3Rpc>,
provider: Arc<Web3Provider>,
rpc: Arc<Web3Rpc>,
}
/// Depending on the context, RPC errors can require different handling.
#[derive(Copy, Clone)]
pub enum RequestRevertHandler {
/// Log at the trace level. Use when errors are expected.
TraceLevel,
@ -123,79 +122,30 @@ impl Authorization {
impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Rpc>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
let mut provider = None;
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
let ready_provider = conn
.provider_state
.read()
.await
// TODO: hard code true, or take a bool in the `new` function?
.provider(true)
.await
.cloned();
// trace!("waiting on provider: unlocked!");
match ready_provider {
None => {
if !logged {
logged = true;
warn!("no provider for {}!", conn);
}
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
sleep(Duration::from_millis(100)).await
}
Some(x) => provider = Some(x),
}
}
let provider = provider.expect("provider was checked already");
// TODO: handle overflows?
// TODO: what ordering?
match authorization.as_ref().authorization_type {
AuthorizationType::Frontend => {
conn.frontend_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
AuthorizationType::Internal => {
conn.internal_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
}
Self {
authorization,
conn,
provider,
rpc: conn,
}
}
pub fn connection_name(&self) -> String {
self.conn.name.clone()
self.rpc.name.clone()
}
#[inline]
pub fn clone_connection(&self) -> Arc<Web3Rpc> {
self.conn.clone()
self.rpc.clone()
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// depending on how things are locked, you might need to pass the provider in
pub async fn request<P, R>(
self,
method: &str,
params: &P,
revert_handler: RequestRevertHandler,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
@ -205,14 +155,57 @@ impl OpenRequestHandle {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.conn, %method, "request");
trace!("requesting from {}", self.rpc);
let mut provider = if unlocked_provider.is_some() {
unlocked_provider
} else {
self.rpc.provider.read().await.clone()
};
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("no provider for open handle on {}", self.rpc);
logged = true;
}
provider = self.rpc.provider.read().await.clone();
}
let provider = provider.expect("provider was checked already");
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.rpc
.active_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// let latency = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match &*self.provider {
let response = match provider.as_ref() {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
p.request(method, params).await
}
};
// note. we intentionally do not record this latency now. we do NOT want to measure errors
// let latency = latency.elapsed();
self.rpc
.active_requests
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// // TODO: i think ethers already has trace logging (and does it much more fancy)
// trace!(
// "response from {} for {} {:?}: {:?}",
@ -266,8 +259,22 @@ impl OpenRequestHandle {
// check for "execution reverted" here
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*self.provider {
let msg = match &*provider {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
Web3Provider::Both(_, _) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
Some(&err.message)
} else if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
Some(&err.message)
} else {
None
}
}
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
@ -290,10 +297,10 @@ impl OpenRequestHandle {
if let Some(msg) = msg {
if msg.starts_with("execution reverted") {
trace!("revert from {}", self.conn);
trace!("revert from {}", self.rpc);
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
trace!("rate limit from {}", self.conn);
trace!("rate limit from {}", self.rpc);
ResponseTypes::RateLimit
} else {
ResponseTypes::Ok
@ -306,10 +313,10 @@ impl OpenRequestHandle {
};
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.conn.hard_limit_until.as_ref() {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
let retry_at = Instant::now() + Duration::from_secs(1);
trace!("retry {} at: {:?}", self.conn, retry_at);
trace!("retry {} at: {:?}", self.rpc, retry_at);
hard_limit_until.send_replace(retry_at);
}
@ -322,14 +329,14 @@ impl OpenRequestHandle {
if matches!(response_type, ResponseTypes::Revert) {
debug!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err
self.rpc, method, params, err
);
}
}
RequestRevertHandler::TraceLevel => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,
self.rpc,
method,
params,
err
@ -339,20 +346,20 @@ impl OpenRequestHandle {
// TODO: include params if not running in release mode
error!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
self.rpc, method, err
);
}
RequestRevertHandler::WarnLevel => {
// TODO: include params if not running in release mode
warn!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
self.rpc, method, err
);
}
RequestRevertHandler::Save => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,
self.rpc,
method,
params,
err
@ -372,16 +379,16 @@ impl OpenRequestHandle {
tokio::spawn(f);
}
}
} else {
// TODO: record request latency
// let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
// TODO: is this lock here a problem? should this be done through a channel? i started to code it, but it didn't seem to matter
// let mut latency_recording = self.rpc.request_latency.write();
// latency_recording.record(latency_ms);
}
response
}
}
impl Drop for OpenRequestHandle {
fn drop(&mut self) {
self.conn
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
}
}

@ -1,71 +0,0 @@
use super::blockchain::{ArcBlock, SavedBlock};
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use ethers::prelude::{H256, U64};
use serde::Serialize;
use std::fmt;
use std::sync::Arc;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct ConsensusWeb3Rpcs {
// TODO: store ArcBlock instead?
pub(super) head_block: Option<SavedBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Rpc>>,
pub(super) num_checked_conns: usize,
pub(super) includes_backups: bool,
}
impl ConsensusWeb3Rpcs {
pub fn num_conns(&self) -> usize {
self.conns.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.conns.len())
.finish_non_exhaustive()
}
}
impl Web3Rpcs {
pub fn head_block(&self) -> Option<ArcBlock> {
self.watch_consensus_head_receiver
.as_ref()
.map(|x| x.borrow().clone())
}
pub fn head_block_hash(&self) -> Option<H256> {
self.head_block().and_then(|x| x.hash)
}
pub fn head_block_num(&self) -> Option<U64> {
self.head_block().and_then(|x| x.number)
}
pub fn synced(&self) -> bool {
!self
.watch_consensus_connections_sender
.borrow()
.conns
.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_connections_sender.borrow().conns.len()
}
}

@ -28,13 +28,15 @@ impl Web3Rpcs {
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_request_handle(authorization, false).await {
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::Error.into(),
None,
)
.await?
}