Send transactions to protected and public rpcs (#57)

* move protected transactions into their own function and dry stats sending

* cargo upgrade

* comments

* time to live instead of time to idle

* minor workaround for eth_chainId

* cargo upgrade
This commit is contained in:
Bryan Stitt 2023-04-24 11:00:12 -07:00 committed by GitHub
parent 78fceb1fbf
commit 56fdf48129
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 262 additions and 254 deletions

125
Cargo.lock generated

@ -74,9 +74,9 @@ dependencies = [
[[package]]
name = "aho-corasick"
version = "0.7.20"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04"
dependencies = [
"memchr",
]
@ -408,9 +408,9 @@ checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "base64ct"
version = "1.5.3"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bech32"
@ -492,9 +492,9 @@ dependencies = [
[[package]]
name = "block-buffer"
version = "0.10.3"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
@ -555,9 +555,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.12.0"
version = "3.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
checksum = "9b1ce199063694f33ffb7dd4e0ee620741495c32833cde5aa08f02a0bf96f0c8"
[[package]]
name = "byte-slice-cast"
@ -567,19 +567,20 @@ checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c"
[[package]]
name = "bytecheck"
version = "0.6.9"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d11cac2c12b5adc6570dad2ee1b87eff4955dac476fe12d81e5fdd352e52406f"
checksum = "13fe11640a23eb24562225322cd3e452b93a3d4091d62fab69c70542fcd17d1f"
dependencies = [
"bytecheck_derive",
"ptr_meta",
"simdutf8",
]
[[package]]
name = "bytecheck_derive"
version = "0.6.9"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13e576ebe98e605500b3c8041bb888e966653577172df6dd97398714eb30b9bf"
checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5"
dependencies = [
"proc-macro2",
"quote",
@ -817,21 +818,19 @@ dependencies = [
[[package]]
name = "coins-bip39"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad2a68a46b9d8cc90484f0689adc0e4c890eb215bf698ae52e5235bb88f40be7"
checksum = "84f4d04ee18e58356accd644896aeb2094ddeafb6a713e056cef0c0a8e468c15"
dependencies = [
"bitvec 0.17.4",
"coins-bip32",
"getrandom",
"hex",
"hmac",
"once_cell",
"pbkdf2 0.12.1",
"rand",
"sha2 0.10.6",
"thiserror",
"tracing",
]
[[package]]
@ -965,7 +964,7 @@ dependencies = [
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.16",
"tracing-subscriber 0.3.17",
]
[[package]]
@ -1019,9 +1018,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.2.5"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58"
dependencies = [
"libc",
]
@ -1087,9 +1086,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b"
dependencies = [
"cfg-if",
]
@ -1270,9 +1269,9 @@ dependencies = [
[[package]]
name = "deadpool-redis"
version = "0.11.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b8bde44cbfdf17ae5baa45c9f43073b320f1a19955389315629304a23909ad2"
checksum = "5f1760f60ffc6653b4afd924c5792098d8c00d9a3deb6b3d989eac17949dc422"
dependencies = [
"deadpool",
"redis",
@ -1387,7 +1386,7 @@ version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
"block-buffer 0.10.3",
"block-buffer 0.10.4",
"const-oid 0.9.2",
"crypto-common",
"subtle",
@ -3004,7 +3003,7 @@ dependencies = [
"petgraph",
"pico-args",
"regex",
"regex-syntax",
"regex-syntax 0.6.29",
"string_cache",
"term",
"tiny-keccak",
@ -3031,9 +3030,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.141"
version = "0.2.142"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317"
[[package]]
name = "libm"
@ -3576,9 +3575,9 @@ dependencies = [
[[package]]
name = "ordered-float"
version = "3.6.0"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13a384337e997e6860ffbaa83708b2ef329fd8c54cb67a5f64d421e0f943254f"
checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213"
dependencies = [
"num-traits",
]
@ -3596,9 +3595,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.4.1"
version = "6.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267"
[[package]]
name = "ouroboros"
@ -4312,9 +4311,9 @@ dependencies = [
[[package]]
name = "redis"
version = "0.22.3"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa8455fa3621f6b41c514946de66ea0531f57ca017b2e6c7cc368035ea5b46df"
checksum = "3ea8c51b5dc1d8e5fd3350ec8167f464ec0995e79f2e90a075b63371500d557f"
dependencies = [
"async-trait",
"bytes",
@ -4370,13 +4369,13 @@ dependencies = [
[[package]]
name = "regex"
version = "1.7.3"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.7.1",
]
[[package]]
@ -4385,7 +4384,7 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
"regex-syntax 0.6.29",
]
[[package]]
@ -4394,6 +4393,12 @@ version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c"
[[package]]
name = "rend"
version = "0.4.0"
@ -4683,9 +4688,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.11"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
[[package]]
name = "ryu"
@ -4795,9 +4800,9 @@ dependencies = [
[[package]]
name = "sea-orm"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d875e2fcd965320e50066028ac0b4877ff07edbb734a6ddfeff48a87dbab38"
checksum = "fade86e8d41fd1a4721f84cb834f4ca2783f973cc30e6212b7fafc134f169214"
dependencies = [
"async-stream",
"async-trait",
@ -4823,9 +4828,9 @@ dependencies = [
[[package]]
name = "sea-orm-cli"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ead9f7dac975f10447f17d08edbb2046daa087b5e0b50bbf8211f303459078c"
checksum = "efbf34a2caf70c2e3be9bb1e674e9540f6dfd7c8f40f6f05daf3b9740e476005"
dependencies = [
"chrono",
"clap",
@ -4833,15 +4838,15 @@ dependencies = [
"regex",
"sea-schema",
"tracing",
"tracing-subscriber 0.3.16",
"tracing-subscriber 0.3.17",
"url",
]
[[package]]
name = "sea-orm-macros"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9b593e9c0cdbb18cafd4da7b92e67a9c2d9892934f3a2d8bbac73d5ba4a98a1"
checksum = "28936f26d62234ff0be16f80115dbdeb3237fe9c25cf18fbcd1e3b3592360f20"
dependencies = [
"bae",
"heck 0.3.3",
@ -4852,9 +4857,9 @@ dependencies = [
[[package]]
name = "sea-orm-migration"
version = "0.11.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edba7a6123c1035b0530deb713820688f0234431ab6c1893b14dce493ade76af"
checksum = "278d3adfd0832b6ffc17d3cfbc574d3695a5c1b38814e0bc8ac238d33f3d87cf"
dependencies = [
"async-trait",
"clap",
@ -4864,7 +4869,7 @@ dependencies = [
"sea-orm-cli",
"sea-schema",
"tracing",
"tracing-subscriber 0.3.16",
"tracing-subscriber 0.3.17",
]
[[package]]
@ -5339,6 +5344,12 @@ dependencies = [
"rand_core",
]
[[package]]
name = "simdutf8"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "siphasher"
version = "0.3.10"
@ -5932,7 +5943,7 @@ dependencies = [
"toml 0.5.11",
"tonic",
"tracing",
"tracing-subscriber 0.3.16",
"tracing-subscriber 0.3.17",
"tui",
]
@ -6191,13 +6202,13 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.15",
]
[[package]]
@ -6275,9 +6286,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers 0.1.0",
"nu-ansi-term",
@ -6978,9 +6989,9 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zeroize"
version = "1.5.7"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zip"

@ -10,7 +10,7 @@ path = "src/mod.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sea-orm = "0.11.2"
sea-orm = "0.11.3"
serde = "1.0.160"
uuid = "1.3.1"
ethers = "2.0.3"

@ -12,7 +12,7 @@ path = "src/lib.rs"
tokio = { version = "1.27.0", features = ["full", "tracing"] }
[dependencies.sea-orm-migration]
version = "0.11.2"
version = "0.11.3"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.

@ -7,5 +7,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
chrono = "0.4.24"
deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] }
deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] }
tokio = "1.27.0"

@ -58,13 +58,13 @@ moka = { version = "0.10.2", default-features = false, features = ["future"] }
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.17.1" }
ordered-float = "3.6.0"
ordered-float = "3.7.0"
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
rdkafka = { version = "0.29.0" }
regex = "1.7.3"
regex = "1.8.1"
reqwest = { version = "0.11.16", default-features = false, features = ["json", "tokio-rustls"] }
rmp-serde = "1.1.1"
rustc-hash = "1.1.0"

@ -670,12 +670,13 @@ impl Web3ProxyApp {
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: ttl on this? or is max_capacity fine?
// TODO: this used to have a time_to_idle
let pending_transactions = Cache::builder()
.max_capacity(10_000)
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(300))
// TODO: this used to be time_to_update, but
.time_to_live(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
@ -695,7 +696,7 @@ impl Web3ProxyApp {
}
})
// TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
.time_to_idle(Duration::from_secs(600))
.time_to_live(Duration::from_secs(600))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all the users are the same size, so no need for a weigher
@ -1208,14 +1209,101 @@ impl Web3ProxyApp {
}
}
/// try to send transactions to the best available rpcs with private mempools
/// if no private rpcs are configured, then some public rpcs are used instead
async fn try_send_protected(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Arc<RequestMetadata>,
num_public_rpcs: Option<usize>,
head_block_num: Option<U64>,
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
// TODO: error/wait if no head block?
// TODO: configurable lag
let min_block_needed = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or_else(|| Web3ProxyError::NoServersSynced)?
.saturating_sub(3.into());
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty() {
// send to protected and public rpcs at the same time
// TODO: send to tier 0 of private, wait a block, ..., tier N of private, wait a block, public
// TODO: allow premium users to choose specifically where they want transactions to go
let public_f = {
let authorization = authorization.clone();
let clone = self.clone();
// TODO: should request be in an arc? inside the request metadata?
let request = request.clone();
let request_metadata = Some(request_metadata.clone());
async move {
clone
.balanced_rpcs
.try_send_all_synced_connections(
&authorization,
&request,
request_metadata,
Some(&min_block_needed),
None,
Level::Trace,
num_public_rpcs,
true,
)
.await
}
};
let public_handle = tokio::spawn(public_f);
let protected_response = protected_rpcs
.try_send_all_synced_connections(
authorization,
request,
Some(request_metadata),
None,
None,
Level::Trace,
None,
true,
)
.await?;
// wait for sending to the public rpcs to finish
// TODO: let this run in the background instead?
public_handle.await??;
return Ok(protected_response);
}
}
// no private rpcs to send to. send to a few public rpcs
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
self.balanced_rpcs
.try_send_all_synced_connections(
authorization,
request,
Some(request_metadata),
Some(&min_block_needed),
None,
Level::Trace,
num_public_rpcs,
true,
)
.await
}
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
// TODO: move this to another module
async fn proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
head_block_num: Option<U64>,
) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>)> {
// trace!("Received request: {:?}", request);
// TODO: move this code to another module so that its easy to turn this trace logging on in dev
trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes()));
@ -1287,7 +1375,7 @@ impl Web3ProxyApp {
// TODO: if eth_chainId or net_version, serve those without querying the backend
// TODO: don't clone?
let partial_response: serde_json::Value = match request_method.as_ref() {
let response: JsonRpcForwardedResponse = match request_method.as_ref() {
// lots of commands are blocked
method @ ("db_getHex"
| "db_getString"
@ -1357,14 +1445,11 @@ impl Web3ProxyApp {
| "shh_version") => {
// i don't think we will ever support these methods
// TODO: what error code?
return Ok((
JsonRpcForwardedResponse::from_string(
format!("method unsupported: {}", method),
None,
Some(request_id),
),
vec![],
));
JsonRpcForwardedResponse::from_string(
format!("method unsupported: {}", method),
None,
Some(request_id),
)
}
// TODO: implement these commands
method @ ("eth_getFilterChanges"
@ -1376,36 +1461,29 @@ impl Web3ProxyApp {
| "eth_uninstallFilter") => {
// TODO: unsupported command stat. use the count to prioritize new features
// TODO: what error code?
return Ok((
// TODO: what code?
JsonRpcForwardedResponse::from_string(
format!("not yet implemented: {}", method),
None,
Some(request_id),
),
vec![],
));
JsonRpcForwardedResponse::from_string(
format!("not yet implemented: {}", method),
None,
Some(request_id),
)
}
method @ ("debug_bundler_sendBundleNow"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool") => {
return Ok((
JsonRpcForwardedResponse::from_string(
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format!("method unsupported: {}", method),
None,
Some(request_id),
),
vec![],
));
JsonRpcForwardedResponse::from_string(
// TODO: we should probably have some escaping on this. but maybe serde will protect us enough
format!("method unsupported: {}", method),
None,
Some(request_id),
)
}
method @ ("eth_sendUserOperation"
_method @ ("eth_sendUserOperation"
| "eth_estimateUserOperationGas"
| "eth_getUserOperationByHash"
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
let response = bundler_4337_rpcs
bundler_4337_rpcs
.try_proxy_connection(
authorization,
request,
@ -1413,56 +1491,39 @@ impl Web3ProxyApp {
None,
None,
)
.await?;
// TODO: DRY
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(method.to_string()),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.map_err(Web3ProxyError::SendAppStatError)?;
}
return Ok((response, rpcs));
.await?
}
None => {
// TODO: stats!
// TODO: not synced error?
return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into());
}
},
// some commands can use local data or caches
"eth_accounts" => {
// no stats on this. its cheap
serde_json::Value::Array(vec![])
JsonRpcForwardedResponse::from_value(serde_json::Value::Array(vec![]), request_id)
}
"eth_blockNumber" => {
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => {
json!(head_block_num)
JsonRpcForwardedResponse::from_value(json!(head_block_num), request_id)
}
None => {
// TODO: what does geth do if this happens?
return Err(Web3ProxyError::UnknownBlockNumber);
// TODO: standard not synced error
return Err(Web3ProxyError::NoServersSynced);
}
}
}
"eth_chainId" => json!(U64::from(self.config.chain_id)),
"eth_chainId" => JsonRpcForwardedResponse::from_value(
json!(U64::from(self.config.chain_id)),
request_id,
),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
// no need for serving coinbase
// no stats on this. its cheap
json!(Address::zero())
JsonRpcForwardedResponse::from_value(json!(Address::zero()), request_id)
}
"eth_estimateGas" => {
let mut response = self
@ -1501,22 +1562,17 @@ impl Web3ProxyApp {
gas_estimate += gas_increase;
json!(gas_estimate)
JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id)
}
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => {
// no stats on this. its cheap
json!(U64::zero())
}
"eth_hashrate" => JsonRpcForwardedResponse::from_value(json!(U64::zero()), request_id),
"eth_mining" => {
// no stats on this. its cheap
serde_json::Value::Bool(false)
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id)
}
// TODO: eth_sendBundle (flashbots command)
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// TODO: how should we handle private_mode here?
let default_num = match authorization.checks.proxy_mode {
let num_public_rpcs = match authorization.checks.proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
@ -1527,47 +1583,26 @@ impl Web3ProxyApp {
ProxyMode::Versus => None,
};
let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
if !private_rpcs.is_empty() && authorization.checks.private_txs {
// if we are sending the transaction privately, no matter the proxy_mode, we send to ALL private rpcs
(private_rpcs, None)
} else {
// TODO: send to balanced_rpcs AND private_rpcs
(&self.balanced_rpcs, default_num)
}
} else {
(&self.balanced_rpcs, default_num)
};
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or_else(|| Web3ProxyError::NoServersSynced)?;
// TODO: error/wait if no head block!
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
// TODO: what lag should we allow?
let mut response = private_rpcs
.try_send_all_synced_connections(
let mut response = self
.try_send_protected(
authorization,
&request,
Some(request_metadata.clone()),
Some(&head_block_num.saturating_sub(2.into())),
None,
Level::Trace,
num,
true,
request_metadata.clone(),
num_public_rpcs,
head_block_num,
)
.await?;
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Just return the hash like a successful response would.
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
if let Some(response_error) = response.error.as_ref() {
if response_error.code == -32000
&& (response_error.message == "ALREADY_EXISTS: already known"
|| response_error.message
== "INTERNAL_ERROR: existing tx with same hash")
{
// TODO: expect instead of web3_context?
let params = request
.params
.web3_context("there must be params if we got this far")?;
@ -1598,9 +1633,7 @@ impl Web3ProxyApp {
}
}
let rpcs = request_metadata.backend_requests.lock().clone();
// emit stats
// emit transaction count stats
if let Some(salt) = self.config.public_recent_ips_salt.as_ref() {
if let Some(tx_hash) = response.result.clone() {
let now = Utc::now().timestamp();
@ -1638,47 +1671,35 @@ impl Web3ProxyApp {
}
}
return Ok((response, rpcs));
response
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
serde_json::Value::Bool(false)
}
"eth_subscribe" => {
return Ok((
JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_subscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
vec![],
));
}
"eth_unsubscribe" => {
return Ok((
JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_unsubscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
vec![],
));
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id)
}
"eth_subscribe" => JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_subscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
"eth_unsubscribe" => JsonRpcForwardedResponse::from_str(
"notifications not supported. eth_unsubscribe is only available over a websocket",
Some(-32601),
Some(request_id),
),
"net_listening" => {
// no stats on this. its cheap
// TODO: only if there are some backends on balanced_rpcs?
serde_json::Value::Bool(true)
}
"net_peerCount" => {
// no stats on this. its cheap
// TODO: do something with proxy_mode here?
json!(U64::from(self.balanced_rpcs.num_synced_rpcs()))
}
"web3_clientVersion" => {
// no stats on this. its cheap
serde_json::Value::String(APP_USER_AGENT.to_string())
JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(true), request_id)
}
"net_peerCount" => JsonRpcForwardedResponse::from_value(
json!(U64::from(self.balanced_rpcs.num_synced_rpcs())),
request_id,
),
"web3_clientVersion" => JsonRpcForwardedResponse::from_value(
serde_json::Value::String(APP_USER_AGENT.to_string()),
request_id,
),
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
@ -1713,32 +1734,23 @@ impl Web3ProxyApp {
let hash = H256::from(keccak256(param));
json!(hash)
JsonRpcForwardedResponse::from_value(json!(hash), request_id)
}
_ => {
// TODO: this needs the correct error code in the response
// TODO: emit stat?
return Ok((
JsonRpcForwardedResponse::from_str(
"invalid request",
Some(StatusCode::BAD_REQUEST.as_u16().into()),
Some(request_id),
),
vec![],
));
JsonRpcForwardedResponse::from_str(
"invalid request",
Some(StatusCode::BAD_REQUEST.as_u16().into()),
Some(request_id),
)
}
}
}
"test" => {
return Ok((
JsonRpcForwardedResponse::from_str(
"The method test does not exist/is not available.",
Some(-32601),
Some(request_id),
),
vec![],
));
}
"test" => JsonRpcForwardedResponse::from_str(
"The method test does not exist/is not available.",
Some(-32601),
Some(request_id),
),
// anything else gets sent to backend rpcs and cached
method => {
if method.starts_with("admin_") {
@ -1893,32 +1905,14 @@ impl Web3ProxyApp {
// replace the id with our request's id.
response.id = request_id;
// TODO: DRY!
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(method.to_string()),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.map_err(Web3ProxyError::SendAppStatError)?;
}
return Ok((response, rpcs));
response
}
};
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
// TODO: DRY
// save the rpcs so they can be included in a response header
let rpcs = request_metadata.backend_requests.lock().clone();
// send stats used for accounting and graphs
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
Some(request_method),
@ -1933,6 +1927,7 @@ impl Web3ProxyApp {
.map_err(Web3ProxyError::SendAppStatError)?;
}
// send debug info as a kafka log
if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff {
let kafka_producer = self
.kafka_producer

@ -131,10 +131,12 @@ struct JsonRpcChainIdResult {
}
async fn get_chain_id(rpc: &str, client: &reqwest::Client) -> anyhow::Result<u64> {
// empty params aren't required by the spec, but some rpc providers require them
let get_chain_id_request = json!({
"id": "1",
"jsonrpc": "2.0",
"method": "eth_chainId",
"params": [],
});
// TODO: loop until chain id is found?

@ -177,12 +177,13 @@ pub async fn block_needed(
"eth_getLogs" => {
// TODO: think about this more
// TODO: jsonrpc has a specific code for this
// TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch
let obj = params
.get_mut(0)
.ok_or_else(|| anyhow::anyhow!("invalid format. no params"))?
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".to_string()))?
.as_object_mut()
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?;
.ok_or_else(|| {
Web3ProxyError::BadRequest("invalid format. params not object".to_string())
})?;
if obj.contains_key("blockHash") {
return Ok(BlockNeeded::CacheSuccessForever);

@ -157,19 +157,21 @@ impl Web3Rpcs {
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better? need to know actual allocated size
// TODO: time_to_idle instead?
// TODO: limits from config
let blocks_by_hash: BlocksByHashCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &Web3ProxyBlock| {
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
.time_to_idle(Duration::from_secs(600))
.time_to_live(Duration::from_secs(30 * 60))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all block numbers are the same size, so no need for weigher
// TODO: limits from config
// TODO: time_to_idle instead?
let blocks_by_number = Cache::builder()
.time_to_idle(Duration::from_secs(600))
.time_to_live(Duration::from_secs(30 * 60))
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
@ -1099,7 +1101,7 @@ impl Web3Rpcs {
/// be sure there is a timeout on this or it might loop forever
#[allow(clippy::too_many_arguments)]
pub async fn try_send_all_synced_connections(
&self,
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,

@ -560,12 +560,12 @@ impl Web3Rpc {
.context(format!("waiting for request handle on {}", self))?
.request(
"eth_chainId",
&json!(Option::None::<()>),
&json!(Vec::<()>::new()),
Level::Trace.into(),
unlocked_provider.clone(),
)
.await;
// trace!("found_chain_id: {:?}", found_chain_id);
trace!("found_chain_id: {:#?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {

@ -219,13 +219,10 @@ impl OpenRequestHandle {
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// // TODO: i think ethers already has trace logging (and does it much more fancy)
// trace!(
// "response from {} for {} {:?}: {:?}",
// self.rpc,
// method,
// params,
// response,
// );
debug!(
"response from {} for {} {:?}: {:?}",
self.rpc, method, params, response,
);
if let Err(err) = &response {
// only save reverts for some types of calls