From 0f280ce48387b9adbbfecea17a6ba959ecfe03d4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Feb 2023 09:55:27 -0800 Subject: [PATCH] cargo upgrade and shorten variable names also begin adding a latency tracker for rpc stats --- Cargo.lock | 137 ++++++++++-------- TODO.md | 14 +- deferred-rate-limiter/Cargo.toml | 2 +- entities/Cargo.toml | 2 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 10 +- web3_proxy/src/app/mod.rs | 25 ++-- web3_proxy/src/bin/web3_proxy_cli/daemon.rs | 6 +- .../bin/web3_proxy_cli/popularity_contest.rs | 8 +- web3_proxy/src/block_number.rs | 6 +- web3_proxy/src/config.rs | 29 ++-- web3_proxy/src/frontend/authorization.rs | 4 +- web3_proxy/src/frontend/mod.rs | 3 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 18 +-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 18 +-- web3_proxy/src/frontend/users.rs | 6 +- web3_proxy/src/metrics_frontend.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 93 ++++++------ .../src/rpcs/{connections.rs => many.rs} | 121 ++++++---------- web3_proxy/src/rpcs/mod.rs | 4 +- web3_proxy/src/rpcs/{connection.rs => one.rs} | 84 ++++++----- web3_proxy/src/rpcs/request.rs | 8 +- web3_proxy/src/rpcs/synced_connections.rs | 16 +- web3_proxy/src/rpcs/transactions.rs | 10 +- 24 files changed, 304 insertions(+), 327 deletions(-) rename web3_proxy/src/rpcs/{connections.rs => many.rs} (94%) rename web3_proxy/src/rpcs/{connection.rs => one.rs} (96%) diff --git a/Cargo.lock b/Cargo.lock index 4ed501f6..bd1536b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,9 +98,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800" dependencies = [ "backtrace", ] @@ -230,6 +230,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-polyfill" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28" +dependencies = [ + "critical-section", +] + [[package]] name = "atty" version = "0.2.14" @@ -310,12 +319,13 @@ dependencies = [ [[package]] name = "axum-client-ip" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfb5a3ddd6367075d50629546fb46710584016ae7704cd03b6d41cb5be82e5a" +checksum = "0d719fabd6813392bbc10e1fe67f2977fad52791a836e51236f7e02f2482e017" dependencies = [ "axum", "forwarded-header-value", + "serde", ] [[package]] @@ -1009,6 +1019,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1216,7 +1232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" dependencies = [ "serde", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -1333,12 +1349,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "dotenv" version = "0.15.0" @@ -1433,7 +1443,7 @@ dependencies = [ "sea-orm", "serde", "ulid", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -1944,12 +1954,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ftoa" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35" - [[package]] name = "funty" version = "2.0.0" @@ -2195,6 +2199,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -2280,6 +2293,19 @@ dependencies = [ "http", ] +[[package]] +name = "heapless" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "spin 0.9.4", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.3.3" @@ -2520,7 +2546,6 @@ checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", "hashbrown 0.12.3", - "serde", ] [[package]] @@ -2890,7 +2915,7 @@ dependencies = [ "tagptr", "thiserror", "triomphe", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -4174,7 +4199,7 @@ dependencies = [ "time 0.3.17", "tracing", "url", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -4233,7 +4258,7 @@ dependencies = [ "sea-query-derive", "serde_json", "time 0.3.17", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -4248,7 +4273,7 @@ dependencies = [ "serde_json", "sqlx", "time 0.3.17", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -4448,7 +4473,7 @@ dependencies = [ "thiserror", "time 0.3.17", "url", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] @@ -4483,9 +4508,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a" dependencies = [ "itoa 1.0.5", "ryu", @@ -4502,18 +4527,25 @@ dependencies = [ ] [[package]] -name = "serde_prometheus" -version = "0.1.6" +name = "serde_plain" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25fcd6131bac47a32328d1ba1ee15a27f8d91ab2e5920dba71dbe93d2648f6b1" +checksum = "d6018081315db179d0ce57b1fe4b62a12a0028c9cf9bbef868c9cf477b3c34ae" dependencies = [ - "ftoa", - "indexmap", - "itoa 0.4.8", - "lazy_static", - "regex", "serde", - "snafu", +] + +[[package]] +name = "serde_prometheus" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb6048d9e4ebc41f7d1a42c79b04c5b460633be307620a0e34a8f81970ea47" +dependencies = [ + "heapless", + "nom", + "serde", + "serde_plain", + "thiserror", ] [[package]] @@ -4685,27 +4717,6 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" -[[package]] -name = "snafu" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" -dependencies = [ - "doc-comment", - "snafu-derive", -] - -[[package]] -name = "snafu-derive" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "socket2" version = "0.4.7" @@ -4836,7 +4847,7 @@ dependencies = [ "time 0.3.17", "tokio-stream", "url", - "uuid 1.2.2", + "uuid 1.3.0", "webpki-roots", ] @@ -4871,6 +4882,12 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -5631,9 +5648,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.2.2" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ "getrandom", "serde", @@ -5837,7 +5854,7 @@ dependencies = [ "tower-http", "ulid", "url", - "uuid 1.2.2", + "uuid 1.3.0", ] [[package]] diff --git a/TODO.md b/TODO.md index 58155f94..207567c4 100644 --- a/TODO.md +++ b/TODO.md @@ -243,8 +243,8 @@ These are roughly in order of completition - [x] cache the status page for a second - [x] request accounting for websockets - [x] database merge scripts -- [x] test that sets up a Web3Connection and asks "has_block" for old and new blocks -- [x] test that sets up Web3Connections with 2 nodes. one behind by several blocks. and see what the "next" server shows as +- [x] test that sets up a Web3Rpc and asks "has_block" for old and new blocks +- [x] test that sets up Web3Rpcs with 2 nodes. one behind by several blocks. and see what the "next" server shows as - [x] ethspam on bsc and polygon gives 1/4 errors. fix whatever is causing this - bugfix! we were using the whole connection list instead of just the synced connection list when picking servers. oops! - [x] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!" @@ -289,7 +289,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - we were caching too aggressively - [x] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", create a success message - we just want to be sure that the server has our tx and in this case, it does. - - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None + - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Rpcs { conns: {"local_erigon_alpha_archive_ws": Web3Rpc { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Rpc { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Rpc { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - [x] serde collect unknown fields in config instead of crash - [x] upgrade user tier by address - [x] all_backend_connections skips syncing servers @@ -556,10 +556,10 @@ in another repo: event subscriber - [ ] weird flapping fork could have more useful logs. like, howd we get to 1/1/4 and fork. geth changed its mind 3 times? - should we change our code to follow the same consensus rules as geth? our first seen still seems like a reasonable choice - other chains might change all sorts of things about their fork choice rules - 2022-07-22T23:52:18.593956Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0xa906…5bc1 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 - 2022-07-22T23:52:18.983441Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 - 2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 - 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 + 2022-07-22T23:52:18.593956Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0xa906…5bc1 rpc=Web3Rpc { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 + 2022-07-22T23:52:18.983441Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0x70e8…48e0 rpc=Web3Rpc { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 + 2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Rpc { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 + 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Rpc { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 - [ ] threshold should check actual available request limits (if any) instead of just the soft limit - [ ] foreign key on_update and on_delete - [ ] database creation timestamps diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index a12ed73a..47e49197 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] redis-rate-limiter = { path = "../redis-rate-limiter" } -anyhow = "1.0.68" +anyhow = "1.0.69" hashbrown = "0.13.2" log = "0.4.17" moka = { version = "0.9.6", default-features = false, features = ["future"] } diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 606a2f39..7d431949 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -12,6 +12,6 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.10.7" serde = "1.0.152" -uuid = "1.2.2" +uuid = "1.3.0" ethers = "1.0.2" ulid = "1.0.0" diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index b44fec53..9ba37ad3 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -5,6 +5,6 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -anyhow = "1.0.68" +anyhow = "1.0.69" deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] } tokio = "1.25.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 9e3d2769..a3795554 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -25,10 +25,10 @@ thread-fast-rng = { path = "../thread-fast-rng" } # TODO: import chrono from sea-orm so we always have the same version # TODO: make sure this time version matches siwe. PR to put this in their prelude -anyhow = { version = "1.0.68", features = ["backtrace"] } +anyhow = { version = "1.0.69", features = ["backtrace"] } argh = "0.1.10" axum = { version = "0.6.4", features = ["headers", "ws"] } -axum-client-ip = "0.3.1" +axum-client-ip = "0.4.0" axum-macros = "0.3.2" chrono = "0.4.23" counter = "0.5.7" @@ -61,8 +61,8 @@ reqwest = { version = "0.11.14", default-features = false, features = ["json", " rustc-hash = "1.1.0" sentry = { version = "0.29.2", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.152", features = [] } -serde_json = { version = "1.0.91", default-features = false, features = ["alloc", "raw_value"] } -serde_prometheus = "0.1.6" +serde_json = { version = "1.0.92", default-features = false, features = ["alloc", "raw_value"] } +serde_prometheus = "0.2.0" siwe = "0.5.0" time = "0.3.17" tokio = { version = "1.25.0", features = ["full"] } @@ -72,4 +72,4 @@ tower = "0.4.13" tower-http = { version = "0.3.5", features = ["cors", "sensitive-headers"] } ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" -uuid = "1.2.2" +uuid = "1.3.0" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5553cdb6..9d0f52b0 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -11,8 +11,8 @@ use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; -use crate::rpcs::connection::Web3Connection; -use crate::rpcs::connections::Web3Connections; +use crate::rpcs::many::Web3Rpcs; +use crate::rpcs::one::Web3Rpc; use crate::rpcs::transactions::TxStatus; use crate::user_token::UserBearerToken; use anyhow::Context; @@ -198,9 +198,9 @@ impl DatabaseReplica { // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard pub struct Web3ProxyApp { /// Send requests to the best server available - pub balanced_rpcs: Arc, + pub balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers - pub private_rpcs: Option>, + pub private_rpcs: Option>, response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? @@ -572,7 +572,7 @@ impl Web3ProxyApp { .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // connect to the load balanced rpcs - let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( + let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn( top_config.app.chain_id, db_conn.clone(), balanced_rpcs, @@ -598,7 +598,7 @@ impl Web3ProxyApp { warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); None } else { - let (private_rpcs, private_handle) = Web3Connections::spawn( + let (private_rpcs, private_handle) = Web3Rpcs::spawn( top_config.app.chain_id, db_conn.clone(), private_rpcs, @@ -911,6 +911,7 @@ impl Web3ProxyApp { user_count, }; + // TODO: i don't like this library. it doesn't include HELP or TYPE lines and so our prometheus server fails to parse it serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals) .expect("prometheus metrics should always serialize") } @@ -921,8 +922,7 @@ impl Web3ProxyApp { authorization: Arc, request: JsonRpcRequestEnum, proxy_mode: ProxyMode, - ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> - { + ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, @@ -961,8 +961,7 @@ impl Web3ProxyApp { authorization: &Arc, requests: Vec, proxy_mode: ProxyMode, - ) -> Result<(Vec, Vec>), FrontendErrorResponse> - { + ) -> Result<(Vec, Vec>), FrontendErrorResponse> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -979,7 +978,7 @@ impl Web3ProxyApp { // TODO: i'm sure this could be done better with iterators // TODO: stream the response? let mut collected: Vec = Vec::with_capacity(num_requests); - let mut collected_rpcs: HashSet> = HashSet::new(); + let mut collected_rpcs: HashSet> = HashSet::new(); for response in responses { // TODO: any way to attach the tried rpcs to the error? it is likely helpful let (response, rpcs) = response?; @@ -1020,7 +1019,7 @@ impl Web3ProxyApp { authorization: &Arc, mut request: JsonRpcRequest, proxy_mode: ProxyMode, - ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { + ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { // trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); @@ -1208,7 +1207,7 @@ impl Web3ProxyApp { ProxyMode::Fastest(0) => None, // TODO: how many balanced rpcs should we send to? configurable? percentage of total? // TODO: what if we do 2 per tier? we want to blast the third party rpcs - // TODO: maybe having the third party rpcs in their own Web3Connections would be good for this + // TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this ProxyMode::Fastest(x) => Some(x * 4), ProxyMode::Versus => None, }; diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs index 9019592a..4e0e056e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs @@ -155,7 +155,7 @@ mod tests { use std::env; use web3_proxy::{ - config::{AppConfig, Web3ConnectionConfig}, + config::{AppConfig, Web3RpcConfig}, rpcs::blockchain::ArcBlock, }; @@ -204,7 +204,7 @@ mod tests { balanced_rpcs: HashMap::from([ ( "anvil".to_string(), - Web3ConnectionConfig { + Web3RpcConfig { disabled: false, display_name: None, url: anvil.endpoint(), @@ -219,7 +219,7 @@ mod tests { ), ( "anvil_ws".to_string(), - Web3ConnectionConfig { + Web3RpcConfig { disabled: false, display_name: None, url: anvil.ws_endpoint(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs index f0107b2f..dccd2012 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; // show what nodes are used most often use argh::FromArgs; -use log::info; +use log::trace; use prettytable::{row, Table}; #[derive(FromArgs, PartialEq, Debug)] @@ -87,10 +87,8 @@ impl PopularityContestSubCommand { by_tier.entry(tier).or_default().push(rpc_data); } - // TODO: sort by_tier - - info!("tier_requests: {:#?}", tier_requests); - info!("by_tier: {:#?}", by_tier); + trace!("tier_requests: {:#?}", tier_requests); + trace!("by_tier: {:#?}", by_tier); let mut table = Table::new(); diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 17937350..33ef7f54 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -8,7 +8,7 @@ use log::warn; use serde_json::json; use std::sync::Arc; -use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; +use crate::{frontend::authorization::Authorization, rpcs::many::Web3Rpcs}; #[allow(non_snake_case)] pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) { @@ -45,7 +45,7 @@ pub async fn clean_block_number( params: &mut serde_json::Value, block_param_id: usize, latest_block: U64, - rpcs: &Web3Connections, + rpcs: &Web3Rpcs, ) -> anyhow::Result { match params.as_array_mut() { None => { @@ -130,7 +130,7 @@ pub async fn block_needed( method: &str, params: Option<&mut serde_json::Value>, head_block_num: U64, - rpcs: &Web3Connections, + rpcs: &Web3Rpcs, ) -> anyhow::Result { let params = if let Some(params) = params { // grab the params so we can inspect and potentially modify them diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index a5693c8c..567c14d9 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,5 +1,5 @@ use crate::rpcs::blockchain::BlockHashesCache; -use crate::rpcs::connection::Web3Connection; +use crate::rpcs::one::Web3Rpc; use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use argh::FromArgs; use ethers::prelude::TxHash; @@ -11,8 +11,8 @@ use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -pub type BlockAndRpc = (Option, Arc); -pub type TxHashAndRpc = (TxHash, Arc); +pub type BlockAndRpc = (Option, Arc); +pub type TxHashAndRpc = (TxHash, Arc); #[derive(Debug, FromArgs)] /// Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. @@ -41,15 +41,15 @@ pub struct CliConfig { #[derive(Clone, Debug, Deserialize)] pub struct TopConfig { pub app: AppConfig, - pub balanced_rpcs: HashMap, + pub balanced_rpcs: HashMap, // TODO: instead of an option, give it a default - pub private_rpcs: Option>, + pub private_rpcs: Option>, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } -/// shared configuration between Web3Connections +/// shared configuration between Web3Rpcs // TODO: no String, only &str #[derive(Clone, Debug, Default, Deserialize)] pub struct AppConfig { @@ -198,7 +198,7 @@ fn default_response_cache_max_bytes() -> usize { /// Configuration for a backend web3 RPC server #[derive(Clone, Debug, Deserialize)] -pub struct Web3ConnectionConfig { +pub struct Web3RpcConfig { /// simple way to disable a connection without deleting the row #[serde(default)] pub disabled: bool, @@ -230,9 +230,9 @@ fn default_tier() -> u64 { 0 } -impl Web3ConnectionConfig { - /// Create a Web3Connection from config - /// TODO: move this into Web3Connection? (just need to make things pub(crate)) +impl Web3RpcConfig { + /// Create a Web3Rpc from config + /// TODO: move this into Web3Rpc? (just need to make things pub(crate)) #[allow(clippy::too_many_arguments)] pub async fn spawn( self, @@ -245,12 +245,9 @@ impl Web3ConnectionConfig { block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option>, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { - warn!( - "unknown Web3ConnectionConfig fields!: {:?}", - self.extra.keys() - ); + warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); } let hard_limit = match (self.hard_limit, redis_pool) { @@ -272,7 +269,7 @@ impl Web3ConnectionConfig { let backup = self.backup.unwrap_or(false); - Web3Connection::spawn( + Web3Rpc::spawn( name, self.display_name, chain_id, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 8c9380da..6d014ebd 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -2,7 +2,7 @@ use super::errors::FrontendErrorResponse; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; -use crate::rpcs::connection::Web3Connection; +use crate::rpcs::one::Web3Rpc; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; @@ -80,7 +80,7 @@ pub struct RequestMetadata { // TODO: "archive" isn't really a boolean. pub archive_request: AtomicBool, /// if this is empty, there was a cache_hit - pub backend_requests: Mutex>>, + pub backend_requests: Mutex>>, pub no_servers: AtomicU64, pub error_response: AtomicBool, pub response_bytes: AtomicU64, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index e1a55e2f..99c7882f 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -1,4 +1,6 @@ //! `frontend` contains HTTP and websocket endpoints for use by users and admins. +//! +//! Important reading about axum extractors: https://docs.rs/axum/latest/axum/extract/index.html#the-order-of-extractors pub mod authorization; pub mod errors; @@ -196,7 +198,6 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() - axum::extract::ConnectInfo (if not behind proxy) */ let service = app.into_make_service_with_connect_info::(); - // let service = app.into_make_service(); // `axum::Server` is a re-export of `hyper::Server` axum::Server::bind(&addr) diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 067546db..7f3e87e4 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -8,7 +8,7 @@ use axum::extract::Path; use axum::headers::{Origin, Referer, UserAgent}; use axum::TypedHeader; use axum::{response::IntoResponse, Extension, Json}; -use axum_client_ip::ClientIp; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use itertools::Itertools; use std::sync::Arc; @@ -19,7 +19,7 @@ use std::sync::Arc; #[debug_handler] pub async fn proxy_web3_rpc( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, Json(payload): Json, ) -> FrontendResult { @@ -29,7 +29,7 @@ pub async fn proxy_web3_rpc( #[debug_handler] pub async fn fastest_proxy_web3_rpc( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, Json(payload): Json, ) -> FrontendResult { @@ -41,7 +41,7 @@ pub async fn fastest_proxy_web3_rpc( #[debug_handler] pub async fn versus_proxy_web3_rpc( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, Json(payload): Json, ) -> FrontendResult { @@ -50,7 +50,7 @@ pub async fn versus_proxy_web3_rpc( async fn _proxy_web3_rpc( app: Arc, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, origin: Option>, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, @@ -91,7 +91,7 @@ async fn _proxy_web3_rpc( #[debug_handler] pub async fn proxy_web3_rpc_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, referer: Option>, user_agent: Option>, @@ -114,7 +114,7 @@ pub async fn proxy_web3_rpc_with_key( #[debug_handler] pub async fn fastest_proxy_web3_rpc_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, referer: Option>, user_agent: Option>, @@ -137,7 +137,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( #[debug_handler] pub async fn versus_proxy_web3_rpc_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, referer: Option>, user_agent: Option>, @@ -160,7 +160,7 @@ pub async fn versus_proxy_web3_rpc_with_key( #[allow(clippy::too_many_arguments)] async fn _proxy_web3_rpc_with_key( app: Arc, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, origin: Option>, referer: Option>, user_agent: Option>, diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index f031aaf6..4de01bce 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -17,7 +17,7 @@ use axum::{ response::{IntoResponse, Redirect}, Extension, TypedHeader, }; -use axum_client_ip::ClientIp; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use futures::SinkExt; use futures::{ @@ -49,7 +49,7 @@ pub enum ProxyMode { #[debug_handler] pub async fn websocket_handler( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, ) -> FrontendResult { @@ -61,7 +61,7 @@ pub async fn websocket_handler( #[debug_handler] pub async fn fastest_websocket_handler( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, ) -> FrontendResult { @@ -75,7 +75,7 @@ pub async fn fastest_websocket_handler( #[debug_handler] pub async fn versus_websocket_handler( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, ) -> FrontendResult { @@ -86,7 +86,7 @@ pub async fn versus_websocket_handler( async fn _websocket_handler( proxy_mode: ProxyMode, app: Arc, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, origin: Option>, ws_upgrade: Option, ) -> FrontendResult { @@ -121,7 +121,7 @@ async fn _websocket_handler( #[debug_handler] pub async fn websocket_handler_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, Path(rpc_key): Path, origin: Option>, referer: Option>, @@ -144,7 +144,7 @@ pub async fn websocket_handler_with_key( #[debug_handler] pub async fn fastest_websocket_handler_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, Path(rpc_key): Path, origin: Option>, referer: Option>, @@ -168,7 +168,7 @@ pub async fn fastest_websocket_handler_with_key( #[debug_handler] pub async fn versus_websocket_handler_with_key( Extension(app): Extension>, - ip: ClientIp, + ip: InsecureClientIp, Path(rpc_key): Path, origin: Option>, referer: Option>, @@ -192,7 +192,7 @@ pub async fn versus_websocket_handler_with_key( async fn _websocket_handler_with_key( proxy_mode: ProxyMode, app: Arc, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, rpc_key: String, origin: Option>, referer: Option>, diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 44d066ba..9acbb3c0 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -16,7 +16,7 @@ use axum::{ response::IntoResponse, Extension, Json, TypedHeader, }; -use axum_client_ip::ClientIp; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use chrono::{TimeZone, Utc}; use entities::sea_orm_active_enums::LogLevel; @@ -61,7 +61,7 @@ use ulid::Ulid; #[debug_handler] pub async fn user_login_get( Extension(app): Extension>, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, // TODO: what does axum's error handling look like if the path fails to parse? Path(mut params): Path>, ) -> FrontendResult { @@ -178,7 +178,7 @@ pub struct PostLogin { #[debug_handler] pub async fn user_login_post( Extension(app): Extension>, - ClientIp(ip): ClientIp, + InsecureClientIp(ip): InsecureClientIp, Query(query): Query, Json(payload): Json, ) -> FrontendResult { diff --git a/web3_proxy/src/metrics_frontend.rs b/web3_proxy/src/metrics_frontend.rs index 2eb2170a..cc2da646 100644 --- a/web3_proxy/src/metrics_frontend.rs +++ b/web3_proxy/src/metrics_frontend.rs @@ -23,13 +23,14 @@ pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? /* - It sequentially looks for an IP in: + InsecureClientIp sequentially looks for an IP in: - x-forwarded-for header (de-facto standard) - x-real-ip header - forwarded header (new standard) - axum::extract::ConnectInfo (if not behind proxy) - So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt + Since we run behind haproxy, x-forwarded-for will be set. + We probably won't need into_make_service_with_connect_info, but it shouldn't hurt. */ let service = app.into_make_service_with_connect_info::(); // let service = app.into_make_service(); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 2ecb5b66..e5116d6d 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -1,10 +1,10 @@ -///! Keep track of the blockchain as seen by a Web3Connections. -use super::connection::Web3Connection; -use super::connections::Web3Connections; +use super::many::Web3Rpcs; +///! Keep track of the blockchain as seen by a Web3Rpcs. +use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; use crate::{ - config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusConnections, + config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusWeb3Rpcs, }; use anyhow::Context; use derive_more::From; @@ -92,7 +92,7 @@ impl Display for SavedBlock { } } -impl Web3Connections { +impl Web3Rpcs { /// add a block to our mappings and track the heaviest chain pub async fn save_block( &self, @@ -135,7 +135,7 @@ impl Web3Connections { &self, authorization: &Arc, hash: &H256, - rpc: Option<&Arc>, + rpc: Option<&Arc>, ) -> anyhow::Result { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere @@ -322,7 +322,7 @@ impl Web3Connections { authorization: &Arc, consensus_finder: &mut ConsensusFinder, rpc_head_block: Option, - rpc: Arc, + rpc: Arc, head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { @@ -550,11 +550,11 @@ impl ConnectionsGroup { Self::new(true) } - fn remove(&mut self, rpc: &Web3Connection) -> Option { + fn remove(&mut self, rpc: &Web3Rpc) -> Option { self.rpc_name_to_hash.remove(rpc.name.as_str()) } - fn insert(&mut self, rpc: &Web3Connection, block_hash: H256) -> Option { + fn insert(&mut self, rpc: &Web3Rpc, block_hash: H256) -> Option { self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash) } @@ -564,7 +564,7 @@ impl ConnectionsGroup { rpc_name: &str, hash: &H256, authorization: &Arc, - web3_connections: &Web3Connections, + web3_rpcs: &Web3Rpcs, ) -> anyhow::Result { // // TODO: why does this happen?!?! seems to only happen with uncled blocks // // TODO: maybe we should do try_get_with? @@ -575,16 +575,17 @@ impl ConnectionsGroup { // ); // this option should almost always be populated. if the connection reconnects at a bad time it might not be available though - let rpc = web3_connections.conns.get(rpc_name); + // TODO: if this is None, I think we should error. + let rpc = web3_rpcs.conns.get(rpc_name); - web3_connections.block(authorization, hash, rpc).await + web3_rpcs.block(authorization, hash, rpc).await } // TODO: do this during insert/remove? pub(self) async fn highest_block( &self, authorization: &Arc, - web3_connections: &Web3Connections, + web3_rpcs: &Web3Rpcs, ) -> Option { let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len()); let mut highest_block = None::; @@ -596,7 +597,7 @@ impl ConnectionsGroup { } let rpc_block = match self - .get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_connections) + .get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_rpcs) .await { Ok(x) => x, @@ -631,9 +632,9 @@ impl ConnectionsGroup { pub(self) async fn consensus_head_connections( &self, authorization: &Arc, - web3_connections: &Web3Connections, - ) -> anyhow::Result { - let mut maybe_head_block = match self.highest_block(authorization, web3_connections).await { + web3_rpcs: &Web3Rpcs, + ) -> anyhow::Result { + let mut maybe_head_block = match self.highest_block(authorization, web3_rpcs).await { None => return Err(anyhow::anyhow!("No blocks known")), Some(x) => x, }; @@ -667,7 +668,7 @@ impl ConnectionsGroup { continue; } - if let Some(rpc) = web3_connections.conns.get(rpc_name.as_str()) { + if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) { highest_rpcs.insert(rpc_name); highest_rpcs_sum_soft_limit += rpc.soft_limit; } else { @@ -676,18 +677,15 @@ impl ConnectionsGroup { } } - if highest_rpcs_sum_soft_limit >= web3_connections.min_sum_soft_limit - && highest_rpcs.len() >= web3_connections.min_head_rpcs + if highest_rpcs_sum_soft_limit >= web3_rpcs.min_sum_soft_limit + && highest_rpcs.len() >= web3_rpcs.min_head_rpcs { // we have enough servers with enough requests break; } // not enough rpcs yet. check the parent block - if let Some(parent_block) = web3_connections - .block_hashes - .get(&maybe_head_block.parent_hash) - { + if let Some(parent_block) = web3_rpcs.block_hashes.get(&maybe_head_block.parent_hash) { // trace!( // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", // ); @@ -695,25 +693,25 @@ impl ConnectionsGroup { maybe_head_block = parent_block; continue; } else { - if num_known < web3_connections.min_head_rpcs { + if num_known < web3_rpcs.min_head_rpcs { return Err(anyhow::anyhow!( "not enough rpcs connected: {}/{}/{}", highest_rpcs.len(), num_known, - web3_connections.min_head_rpcs, + web3_rpcs.min_head_rpcs, )); } else { let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32 - / web3_connections.min_sum_soft_limit as f32) + / web3_rpcs.min_sum_soft_limit as f32) * 100.0; return Err(anyhow::anyhow!( "ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", highest_rpcs.len(), num_known, - web3_connections.min_head_rpcs, + web3_rpcs.min_head_rpcs, highest_rpcs_sum_soft_limit, - web3_connections.min_sum_soft_limit, + web3_rpcs.min_sum_soft_limit, soft_limit_percent, )); } @@ -723,29 +721,28 @@ impl ConnectionsGroup { // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. // we've done all the searching for the heaviest block that we can - if highest_rpcs.len() < web3_connections.min_head_rpcs - || highest_rpcs_sum_soft_limit < web3_connections.min_sum_soft_limit + if highest_rpcs.len() < web3_rpcs.min_head_rpcs + || highest_rpcs_sum_soft_limit < web3_rpcs.min_sum_soft_limit { // if we get here, not enough servers are synced. return an error - let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32 - / web3_connections.min_sum_soft_limit as f32) - * 100.0; + let soft_limit_percent = + (highest_rpcs_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0; return Err(anyhow::anyhow!( "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", highest_rpcs.len(), num_known, - web3_connections.min_head_rpcs, + web3_rpcs.min_head_rpcs, highest_rpcs_sum_soft_limit, - web3_connections.min_sum_soft_limit, + web3_rpcs.min_sum_soft_limit, soft_limit_percent, )); } // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns: Vec> = highest_rpcs + let conns: Vec> = highest_rpcs .into_iter() - .filter_map(|conn_name| web3_connections.conns.get(conn_name).cloned()) + .filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned()) .collect(); // TODO: DEBUG only check @@ -758,7 +755,7 @@ impl ConnectionsGroup { let consensus_head_block: SavedBlock = maybe_head_block.into(); - Ok(ConsensusConnections { + Ok(ConsensusWeb3Rpcs { head_block: Some(consensus_head_block), conns, num_checked_conns: self.rpc_name_to_hash.len(), @@ -785,7 +782,7 @@ impl Default for ConsensusFinder { } impl ConsensusFinder { - fn remove(&mut self, rpc: &Web3Connection) -> Option { + fn remove(&mut self, rpc: &Web3Rpc) -> Option { // TODO: should we have multiple backup tiers? (remote datacenters vs third party) if !rpc.backup { self.main.remove(rpc); @@ -793,7 +790,7 @@ impl ConsensusFinder { self.all.remove(rpc) } - fn insert(&mut self, rpc: &Web3Connection, new_hash: H256) -> Option { + fn insert(&mut self, rpc: &Web3Rpc, new_hash: H256) -> Option { // TODO: should we have multiple backup tiers? (remote datacenters vs third party) if !rpc.backup { self.main.insert(rpc, new_hash); @@ -805,9 +802,9 @@ impl ConsensusFinder { async fn update_rpc( &mut self, rpc_head_block: Option, - rpc: Arc, + rpc: Arc, // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around - web3_connections: &Web3Connections, + web3_connections: &Web3Rpcs, ) -> anyhow::Result { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let changed = match rpc_head_block { @@ -852,15 +849,15 @@ impl ConsensusFinder { async fn best_consensus_connections( &mut self, authorization: &Arc, - web3_connections: &Web3Connections, - ) -> ConsensusConnections { + web3_connections: &Web3Rpcs, + ) -> ConsensusWeb3Rpcs { let highest_block_num = match self .all .highest_block(authorization, web3_connections) .await { None => { - return ConsensusConnections::default(); + return ConsensusWeb3Rpcs::default(); } Some(x) => x.number.expect("blocks here should always have a number"), }; @@ -901,7 +898,7 @@ impl ConsensusFinder { if self.all.rpc_name_to_hash.len() < web3_connections.min_head_rpcs { debug!("No consensus head yet: {}", err); } - return ConsensusConnections::default(); + return ConsensusWeb3Rpcs::default(); } Ok(x) => x, }; @@ -924,7 +921,7 @@ impl ConsensusFinder { } else { // TODO: i don't think we need this error. and i doublt we'll ever even get here error!("NO CONSENSUS HEAD!"); - ConsensusConnections::default() + ConsensusWeb3Rpcs::default() } } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/many.rs similarity index 94% rename from web3_proxy/src/rpcs/connections.rs rename to web3_proxy/src/rpcs/many.rs index b45c58e2..c8ab6b0a 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,12 +1,10 @@ -///! Load balanced communication with a group of web3 providers +///! Load balanced communication with a group of web3 rpc providers use super::blockchain::{ArcBlock, BlockHashesCache}; -use super::connection::Web3Connection; -use super::request::{ - OpenRequestHandle, OpenRequestResult, RequestRevertHandler, -}; -use super::synced_connections::ConsensusConnections; +use super::one::Web3Rpc; +use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler}; +use super::synced_connections::ConsensusWeb3Rpcs; use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; +use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -36,11 +34,11 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] -pub struct Web3Connections { +pub struct Web3Rpcs { /// any requests will be forwarded to one (or more) of these connections - pub(crate) conns: HashMap>, + pub(crate) conns: HashMap>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` - pub(super) watch_consensus_connections_sender: watch::Sender>, + pub(super) watch_consensus_connections_sender: watch::Sender>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_receiver: Option>, pub(super) pending_transactions: @@ -54,13 +52,13 @@ pub struct Web3Connections { pub(super) min_sum_soft_limit: u32, } -impl Web3Connections { +impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: u64, db_conn: Option, - server_configs: HashMap, + server_configs: HashMap, http_client: Option, redis_pool: Option, block_map: BlockHashesCache, @@ -242,13 +240,13 @@ impl Web3Connections { Ok((connections, handle)) } - pub fn get(&self, conn_name: &str) -> Option<&Arc> { + pub fn get(&self, conn_name: &str) -> Option<&Arc> { self.conns.get(conn_name) } /// subscribe to blocks and transactions from all the backend rpcs. - /// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver` - /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` + /// blocks are processed by all the `Web3Rpc`s and then sent to the `block_receiver` + /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, authorization: Arc, @@ -412,7 +410,7 @@ impl Web3Connections { &self, authorization: &Arc, request_metadata: Option<&Arc>, - skip: &[Arc], + skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { if let Ok(without_backups) = self @@ -447,13 +445,10 @@ impl Web3Connections { allow_backups: bool, authorization: &Arc, request_metadata: Option<&Arc>, - skip: &[Arc], + skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { - let usable_rpcs_by_head_num_and_weight: BTreeMap< - (Option, u64), - Vec>, - > = { + let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option, u64), Vec>> = { let synced_connections = self.watch_consensus_connections_sender.borrow().clone(); let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() { @@ -1113,23 +1108,23 @@ impl Web3Connections { } } -impl fmt::Debug for Web3Connections { +impl fmt::Debug for Web3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3Connections") + f.debug_struct("Web3Rpcs") .field("conns", &self.conns) .finish_non_exhaustive() } } -impl Serialize for Web3Connections { +impl Serialize for Web3Rpcs { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Connections", 6)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; - let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect(); + let conns: Vec<&Web3Rpc> = self.conns.values().map(|x| x.as_ref()).collect(); state.serialize_field("conns", &conns)?; { @@ -1149,7 +1144,7 @@ impl Serialize for Web3Connections { } /// sort by block number (descending) and tier (ascending) -fn sort_connections_by_sync_status(rpcs: &mut Vec>) { +fn sort_connections_by_sync_status(rpcs: &mut Vec>) { rpcs.sort_by_cached_key(|x| { let reversed_head_block = u64::MAX - x.head_block @@ -1170,7 +1165,7 @@ mod tests { use super::*; use crate::rpcs::{ blockchain::{ConsensusFinder, SavedBlock}, - connection::ProviderState, + one::ProviderState, provider::Web3Provider, }; use ethers::types::{Block, U256}; @@ -1205,37 +1200,37 @@ mod tests { .collect(); let mut rpcs = [ - Web3Connection { + Web3Rpc { name: "a".to_string(), tier: 0, head_block: RwLock::new(None), ..Default::default() }, - Web3Connection { + Web3Rpc { name: "b".to_string(), tier: 0, head_block: RwLock::new(blocks.get(1).cloned()), ..Default::default() }, - Web3Connection { + Web3Rpc { name: "c".to_string(), tier: 0, head_block: RwLock::new(blocks.get(2).cloned()), ..Default::default() }, - Web3Connection { + Web3Rpc { name: "d".to_string(), tier: 1, head_block: RwLock::new(None), ..Default::default() }, - Web3Connection { + Web3Rpc { name: "e".to_string(), tier: 1, head_block: RwLock::new(blocks.get(1).cloned()), ..Default::default() }, - Web3Connection { + Web3Rpc { name: "f".to_string(), tier: 1, head_block: RwLock::new(blocks.get(2).cloned()), @@ -1292,48 +1287,32 @@ mod tests { let block_data_limit = u64::MAX; - let head_rpc = Web3Connection { + let head_rpc = Web3Rpc { name: "synced".to_string(), - db_conn: None, - display_name: None, - url: "ws://example.com/synced".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( Web3Provider::Mock, ))), - hard_limit: None, - hard_limit_until: None, soft_limit: 1_000, - automatic_block_limit: true, + automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), + ..Default::default() }; - let lagged_rpc = Web3Connection { + let lagged_rpc = Web3Rpc { name: "lagged".to_string(), - db_conn: None, - display_name: None, - url: "ws://example.com/lagged".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( Web3Provider::Mock, ))), - hard_limit: None, - hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), + ..Default::default() }; assert!(head_rpc.has_block_data(&lagged_block.number())); @@ -1352,8 +1331,8 @@ mod tests { let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); - // TODO: make a Web3Connections::new - let conns = Web3Connections { + // TODO: make a Web3Rpcs::new + let conns = Web3Rpcs { conns, watch_consensus_head_receiver: None, watch_consensus_connections_sender, @@ -1523,48 +1502,32 @@ mod tests { let head_block: SavedBlock = Arc::new(head_block).into(); - let pruned_rpc = Web3Connection { + let pruned_rpc = Web3Rpc { name: "pruned".to_string(), - db_conn: None, - display_name: None, - url: "ws://example.com/pruned".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( Web3Provider::Mock, ))), - hard_limit: None, - hard_limit_until: None, soft_limit: 3_000, automatic_block_limit: false, backup: false, block_data_limit: 64.into(), tier: 1, head_block: RwLock::new(Some(head_block.clone())), + ..Default::default() }; - let archive_rpc = Web3Connection { + let archive_rpc = Web3Rpc { name: "archive".to_string(), - db_conn: None, - display_name: None, - url: "ws://example.com/archive".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( Web3Provider::Mock, ))), - hard_limit: None, - hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), tier: 2, head_block: RwLock::new(Some(head_block.clone())), + ..Default::default() }; assert!(pruned_rpc.has_block_data(&head_block.number())); @@ -1582,8 +1545,8 @@ mod tests { let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); - // TODO: make a Web3Connections::new - let conns = Web3Connections { + // TODO: make a Web3Rpcs::new + let conns = Web3Rpcs { conns, watch_consensus_head_receiver: None, watch_consensus_connections_sender, diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index 9a05f896..44ea5afe 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -1,7 +1,7 @@ // TODO: all pub, or export useful things here instead? pub mod blockchain; -pub mod connection; -pub mod connections; +pub mod many; +pub mod one; pub mod provider; pub mod request; pub mod synced_connections; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/one.rs similarity index 96% rename from web3_proxy/src/rpcs/connection.rs rename to web3_proxy/src/rpcs/one.rs index 1b34ba4b..a42ea61b 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -10,6 +10,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::U256; use futures::future::try_join_all; use futures::StreamExt; +use hdrhistogram::Histogram; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use parking_lot::RwLock; @@ -64,9 +65,31 @@ impl ProviderState { } } +pub struct Web3RpcLatencies { + /// Traack how far behind the fastest node we are + new_head: Histogram, + /// exponentially weighted moving average of how far behind the fastest node we are + new_head_ewma: u32, + /// Track how long an rpc call takes on average + request: Histogram, + /// exponentially weighted moving average of how far behind the fastest node we are + request_ewma: u32, +} + +impl Default for Web3RpcLatencies { + fn default() -> Self { + Self { + new_head: Histogram::new(3).unwrap(), + new_head_ewma: 0, + request: Histogram::new(3).unwrap(), + request_ewma: 0, + } + } +} + /// An active connection to a Web3 RPC server like geth or erigon. #[derive(Default)] -pub struct Web3Connection { +pub struct Web3Rpc { pub name: String, pub display_name: Option, pub db_conn: Option, @@ -100,9 +123,11 @@ pub struct Web3Connection { pub(super) tier: u64, /// TODO: change this to a watch channel so that http providers can subscribe and take action on change pub(super) head_block: RwLock>, + /// Track how fast this RPC is + pub(super) latency: Web3RpcLatencies, } -impl Web3Connection { +impl Web3Rpc { /// Connect to a web3 rpc // TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields #[allow(clippy::too_many_arguments)] @@ -126,7 +151,7 @@ impl Web3Connection { tx_id_sender: Option)>>, reconnect: bool, tier: u64, - ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { // TODO: is cache size 1 okay? i think we need RedisRateLimiter::new( @@ -159,18 +184,14 @@ impl Web3Connection { display_name, http_client, url: url_str, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::None), hard_limit, hard_limit_until, soft_limit, automatic_block_limit, backup, block_data_limit, - head_block: RwLock::new(Default::default()), tier, + ..Default::default() }; let new_connection = Arc::new(new_connection); @@ -1068,40 +1089,40 @@ impl fmt::Debug for Web3Provider { } } -impl Hash for Web3Connection { +impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { // TODO: is this enough? self.name.hash(state); } } -impl Eq for Web3Connection {} +impl Eq for Web3Rpc {} -impl Ord for Web3Connection { +impl Ord for Web3Rpc { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.name.cmp(&other.name) } } -impl PartialOrd for Web3Connection { +impl PartialOrd for Web3Rpc { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for Web3Connection { +impl PartialEq for Web3Rpc { fn eq(&self, other: &Self) -> bool { self.name == other.name } } -impl Serialize for Web3Connection { +impl Serialize for Web3Rpc { fn serialize(&self, serializer: S) -> Result where S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Connection", 9)?; + let mut state = serializer.serialize_struct("Web3Rpc", 9)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1143,9 +1164,9 @@ impl Serialize for Web3Connection { } } -impl fmt::Debug for Web3Connection { +impl fmt::Debug for Web3Rpc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut f = f.debug_struct("Web3Connection"); + let mut f = f.debug_struct("Web3Rpc"); f.field("name", &self.name); @@ -1160,7 +1181,7 @@ impl fmt::Debug for Web3Connection { } } -impl fmt::Display for Web3Connection { +impl fmt::Display for Web3Rpc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: filter basic auth and api keys write!(f, "{}", &self.name) @@ -1193,24 +1214,16 @@ mod tests { let head_block = SavedBlock::new(random_block); let block_data_limit = u64::MAX; - let x = Web3Connection { + let x = Web3Rpc { name: "name".to_string(), - db_conn: None, - display_name: None, url: "ws://example.com".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::None), - hard_limit: None, - hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), + ..Default::default() }; assert!(x.has_block_data(&0.into())); @@ -1239,24 +1252,15 @@ mod tests { let block_data_limit = 64; // TODO: this is getting long. have a `impl Default` - let x = Web3Connection { + let x = Web3Rpc { name: "name".to_string(), - db_conn: None, - display_name: None, - url: "ws://example.com".to_string(), - http_client: None, - active_requests: 0.into(), - frontend_requests: 0.into(), - internal_requests: 0.into(), - provider_state: AsyncRwLock::new(ProviderState::None), - hard_limit: None, - hard_limit_until: None, soft_limit: 1_000, automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), + ..Default::default() }; assert!(!x.has_block_data(&0.into())); @@ -1293,7 +1297,7 @@ mod tests { let metrics = OpenRequestHandleMetrics::default(); - let x = Web3Connection { + let x = Web3Rpc { name: "name".to_string(), db_conn: None, display_name: None, diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index a586f90b..da204992 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,4 +1,4 @@ -use super::connection::Web3Connection; +use super::one::Web3Rpc; use super::provider::Web3Provider; use crate::frontend::authorization::{Authorization, AuthorizationType}; use anyhow::Context; @@ -30,7 +30,7 @@ pub enum OpenRequestResult { #[derive(Debug)] pub struct OpenRequestHandle { authorization: Arc, - conn: Arc, + conn: Arc, provider: Arc, } @@ -122,7 +122,7 @@ impl Authorization { } impl OpenRequestHandle { - pub async fn new(authorization: Arc, conn: Arc) -> Self { + pub async fn new(authorization: Arc, conn: Arc) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! @@ -185,7 +185,7 @@ impl OpenRequestHandle { } #[inline] - pub fn clone_connection(&self) -> Arc { + pub fn clone_connection(&self) -> Arc { self.conn.clone() } diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 224381df..e285c307 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,25 +1,25 @@ use super::blockchain::{ArcBlock, SavedBlock}; -use super::connection::Web3Connection; -use super::connections::Web3Connections; +use super::many::Web3Rpcs; +use super::one::Web3Rpc; use ethers::prelude::{H256, U64}; use serde::Serialize; use std::fmt; use std::sync::Arc; -/// A collection of Web3Connections that are on the same block. +/// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] -pub struct ConsensusConnections { +pub struct ConsensusWeb3Rpcs { // TODO: store ArcBlock instead? pub(super) head_block: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] - pub(super) conns: Vec>, + pub(super) conns: Vec>, pub(super) num_checked_conns: usize, pub(super) includes_backups: bool, } -impl ConsensusConnections { +impl ConsensusWeb3Rpcs { pub fn num_conns(&self) -> usize { self.conns.len() } @@ -31,7 +31,7 @@ impl ConsensusConnections { // TODO: sum_hard_limit? } -impl fmt::Debug for ConsensusConnections { +impl fmt::Debug for ConsensusWeb3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? @@ -42,7 +42,7 @@ impl fmt::Debug for ConsensusConnections { } } -impl Web3Connections { +impl Web3Rpcs { pub fn head_block(&self) -> Option { self.watch_consensus_head_receiver .as_ref() diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index cc5a4011..dc5710d1 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -1,8 +1,8 @@ use crate::frontend::authorization::Authorization; +use super::many::Web3Rpcs; ///! Load balanced communication with a group of web3 providers -use super::connection::Web3Connection; -use super::connections::Web3Connections; +use super::one::Web3Rpc; use super::request::OpenRequestResult; use ethers::prelude::{ProviderError, Transaction, TxHash}; use log::{debug, trace, Level}; @@ -17,11 +17,11 @@ pub enum TxStatus { Orphaned(Transaction), } -impl Web3Connections { +impl Web3Rpcs { async fn query_transaction_status( &self, authorization: &Arc, - rpc: Arc, + rpc: Arc, pending_tx_id: TxHash, ) -> Result, ProviderError> { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) @@ -66,7 +66,7 @@ impl Web3Connections { pub(super) async fn process_incoming_tx_id( self: Arc, authorization: Arc, - rpc: Arc, + rpc: Arc, pending_tx_id: TxHash, pending_tx_sender: broadcast::Sender, ) -> anyhow::Result<()> {