From 68f73ec0b11ccbd146426c38de957fd12b39626c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 8 Jun 2023 13:42:45 -0700 Subject: [PATCH 01/13] more things should be BadRequest instead of 500 --- web3_proxy/src/frontend/admin.rs | 26 +++++++--- web3_proxy/src/stats/influxdb_queries.rs | 64 +++++++++--------------- 2 files changed, 43 insertions(+), 47 deletions(-) diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index b2012448..d64241e1 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -354,23 +354,33 @@ pub async fn admin_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = - Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; + let their_msg_bytes = Bytes::from_str(&payload.msg).map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing payload message as Bytes: {}", err).into(), + ) + })?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .web3_context("parsing hex string message")? + .map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing bytes as siwe message: {}", err).into(), + ) + })? } else { - payload - .msg - .parse::() - .web3_context("parsing string message")? + payload.msg.parse::().map_err(|err| { + Web3ProxyError::BadRequest( + format!("error parsing string as siwe message: {}", err).into(), + ) + })? }; // the only part of the message we will trust is their nonce // TODO: this is fragile. have a helper function/struct for redis keys - let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; + let login_nonce = UserBearerToken::from_str(&their_msg.nonce).map_err(|err| { + Web3ProxyError::BadRequest(format!("error parsing nonce: {}", err).into()) + })?; // fetch the message we gave them from our database let db_replica = app diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index fe3d966b..87527133 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -197,7 +197,10 @@ pub async fn query_user_stats<'a>( let raw_influx_responses: Vec = influxdb_client .query_raw(Some(query.clone())) .await - .context("failed parsing query result into a FluxRecord")?; + .context(format!( + "failed parsing query result into a FluxRecord. Query={:?}", + query + ))?; // Basically rename all items to be "total", // calculate number of "archive_needed" and "error_responses" through their boolean representations ... @@ -206,30 +209,28 @@ pub async fn query_user_stats<'a>( // TODO: I must be able to probably zip the balance query... let datapoints = raw_influx_responses .into_iter() - // .into_values() .map(|x| x.values) .map(|value_map| { // Unwrap all relevant numbers - // BTreeMap - let mut out: HashMap = HashMap::new(); + let mut out: HashMap<&str, serde_json::Value> = HashMap::new(); value_map.into_iter().for_each(|(key, value)| { if key == "_measurement" { match value { influxdb2_structmap::value::Value::String(inner) => { if inner == "opt_in_proxy" { out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("opt-in".to_owned()), ); } else if inner == "global_proxy" { out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("global".to_owned()), ); } else { warn!("Some datapoints are not part of any _measurement!"); out.insert( - "collection".to_owned(), + "collection", serde_json::Value::String("unknown".to_owned()), ); } @@ -241,10 +242,7 @@ pub async fn query_user_stats<'a>( } else if key == "_stop" { match value { influxdb2_structmap::value::Value::TimeRFC(inner) => { - out.insert( - "stop_time".to_owned(), - serde_json::Value::String(inner.to_string()), - ); + out.insert("stop_time", serde_json::Value::String(inner.to_string())); } _ => { error!("_stop should always be a TimeRFC!"); @@ -253,10 +251,7 @@ pub async fn query_user_stats<'a>( } else if key == "_time" { match value { influxdb2_structmap::value::Value::TimeRFC(inner) => { - out.insert( - "time".to_owned(), - serde_json::Value::String(inner.to_string()), - ); + out.insert("time", serde_json::Value::String(inner.to_string())); } _ => { error!("_stop should always be a TimeRFC!"); @@ -266,7 +261,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_backend_requests".to_owned(), + "total_backend_requests", serde_json::Value::Number(inner.into()), ); } @@ -277,7 +272,7 @@ pub async fn query_user_stats<'a>( } else if key == "balance" { match value { influxdb2_structmap::value::Value::Double(inner) => { - out.insert("balance".to_owned(), json!(f64::from(inner))); + out.insert("balance", json!(f64::from(inner))); } _ => { error!("balance should always be a Double!"); @@ -286,10 +281,7 @@ pub async fn query_user_stats<'a>( } else if key == "cache_hits" { match value { influxdb2_structmap::value::Value::Long(inner) => { - out.insert( - "total_cache_hits".to_owned(), - serde_json::Value::Number(inner.into()), - ); + out.insert("total_cache_hits", serde_json::Value::Number(inner.into())); } _ => { error!("cache_hits should always be a Long!"); @@ -299,7 +291,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_cache_misses".to_owned(), + "total_cache_misses", serde_json::Value::Number(inner.into()), ); } @@ -311,7 +303,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_frontend_requests".to_owned(), + "total_frontend_requests", serde_json::Value::Number(inner.into()), ); } @@ -322,10 +314,7 @@ pub async fn query_user_stats<'a>( } else if key == "no_servers" { match value { influxdb2_structmap::value::Value::Long(inner) => { - out.insert( - "no_servers".to_owned(), - serde_json::Value::Number(inner.into()), - ); + out.insert("no_servers", serde_json::Value::Number(inner.into())); } _ => { error!("no_servers should always be a Long!"); @@ -334,7 +323,7 @@ pub async fn query_user_stats<'a>( } else if key == "sum_credits_used" { match value { influxdb2_structmap::value::Value::Double(inner) => { - out.insert("total_credits_used".to_owned(), json!(f64::from(inner))); + out.insert("total_credits_used", json!(f64::from(inner))); } _ => { error!("sum_credits_used should always be a Double!"); @@ -344,7 +333,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_request_bytes".to_owned(), + "total_request_bytes", serde_json::Value::Number(inner.into()), ); } @@ -356,7 +345,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_response_bytes".to_owned(), + "total_response_bytes", serde_json::Value::Number(inner.into()), ); } @@ -368,7 +357,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "rpc_key".to_owned(), + "rpc_key", serde_json::Value::String( rpc_key_id_to_key.get(&inner).unwrap().to_string(), ), @@ -382,7 +371,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::Long(inner) => { out.insert( - "total_response_millis".to_owned(), + "total_response_millis", serde_json::Value::Number(inner.into()), ); } @@ -395,7 +384,7 @@ pub async fn query_user_stats<'a>( else if stat_response_type == StatType::Detailed && key == "method" { match value { influxdb2_structmap::value::Value::String(inner) => { - out.insert("method".to_owned(), serde_json::Value::String(inner)); + out.insert("method", serde_json::Value::String(inner)); } _ => { error!("method should always be a String!"); @@ -404,7 +393,7 @@ pub async fn query_user_stats<'a>( } else if key == "chain_id" { match value { influxdb2_structmap::value::Value::String(inner) => { - out.insert("chain_id".to_owned(), serde_json::Value::String(inner)); + out.insert("chain_id", serde_json::Value::String(inner)); } _ => { error!("chain_id should always be a String!"); @@ -414,7 +403,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "archive_needed".to_owned(), + "archive_needed", if inner == "true" { serde_json::Value::Bool(true) } else if inner == "false" { @@ -432,7 +421,7 @@ pub async fn query_user_stats<'a>( match value { influxdb2_structmap::value::Value::String(inner) => { out.insert( - "error_response".to_owned(), + "error_response", if inner == "true" { serde_json::Value::Bool(true) } else if inner == "false" { @@ -484,9 +473,6 @@ pub async fn query_user_stats<'a>( } let response = Json(json!(response_body)).into_response(); - // Add the requests back into out - - // TODO: Now impplement the proper response type Ok(response) } From d91713e09e217166fcb3a7b276604966da3559e0 Mon Sep 17 00:00:00 2001 From: David Date: Fri, 9 Jun 2023 00:57:53 +0200 Subject: [PATCH 02/13] this brings back balance back to the stats request (#112) * removed bloom filter temporarily, added some fixes with decimals in payment.rs * balance is inside the influx query again * fixed aggregate --- scripts/generate-requests-and-stats.sh | 4 +- .../101-balance-referral-stats.sh | 8 +++- web3_proxy/src/stats/influxdb_queries.rs | 37 +++++++++++++++++-- 3 files changed, 44 insertions(+), 5 deletions(-) rename scripts/{ => manual-tests}/101-balance-referral-stats.sh (95%) diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh index a05e55e7..77ae8f3a 100644 --- a/scripts/generate-requests-and-stats.sh +++ b/scripts/generate-requests-and-stats.sh @@ -5,4 +5,6 @@ # https://github.com/INFURA/versus # ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that -./ethspam http://127.0.0.1:8544/rpc/01H0ZZJDNNEW49FRFS4D9SPR8B | ./versus --concurrency=4 --stop-after 100 http://localhost:8544/rpc/01H0ZZJDNNEW49FRFS4D9SPR8B +./ethspam http://127.0.0.1:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR | ./versus --concurrency=4 --stop-after 10000 http://localhost:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR + +./ethspam http://127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD | ./versus --concurrency=4 --stop-after 10000 http://localhost:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD diff --git a/scripts/101-balance-referral-stats.sh b/scripts/manual-tests/101-balance-referral-stats.sh similarity index 95% rename from scripts/101-balance-referral-stats.sh rename to scripts/manual-tests/101-balance-referral-stats.sh index 5ac82345..40b4fbba 100644 --- a/scripts/101-balance-referral-stats.sh +++ b/scripts/manual-tests/101-balance-referral-stats.sh @@ -127,4 +127,10 @@ curl \ curl \ -H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ --X GET "127.0.0.1:8544/user/referral/stats/shared-codes" \ No newline at end of file +-X GET "127.0.0.1:8544/user/referral/stats/shared-codes" + + +# Finally also get some stats +curl -X GET \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +"http://localhost:8544/user/stats/detailed?query_start=1686236378&query_window_seconds=3600" diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 87527133..dd0dc37b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -167,6 +167,23 @@ pub async fn query_user_stats<'a>( StatType::Detailed => "".to_string(), }; + let join_candidates = match stat_response_type { + StatType::Aggregated => f!( + r#"{:?}"#, + vec!["_time", "_measurement", "chain_id", "rpc_secret_key_id"] + ), + StatType::Detailed => f!( + r#"{:?}"#, + vec![ + "_time", + "_measurement", + "method", + "chain_id", + "rpc_secret_key_id" + ] + ), + }; + let query = f!(r#" base = from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) @@ -174,8 +191,8 @@ pub async fn query_user_stats<'a>( |> filter(fn: (r) => r["_measurement"] == "{measurement}") {filter_chain_id} {drop_method} - - base + + cumsum = base |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> drop(columns: ["balance"]) @@ -186,7 +203,21 @@ pub async fn query_user_stats<'a>( |> sort(columns: ["frontend_requests"], desc: true) |> limit(n: 1) |> group() - |> sort(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"], desc: true) + + balance = base + |> toFloat() + |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) + |> mean(column: "balance") + |> group() + |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) + + join( + tables: {{cumsum, balance}}, + on: {join_candidates} + ) + |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) "#); debug!("Raw query to db is: {:#?}", query); From b6ff14210d8be07c3bb9e47a157ee0f0076479b5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 8 Jun 2023 22:35:14 -0700 Subject: [PATCH 03/13] cargo upgrade --- Cargo.lock | 64 ++++++++++---------------------- deferred-rate-limiter/Cargo.toml | 2 +- entities/Cargo.toml | 2 +- latency/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 4 +- 5 files changed, 25 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8c817a7..445589f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2275,7 +2275,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0176e0459c2e4a1fe232f984bca6890e681076abb9934f6cea7c326f3fc47818" dependencies = [ "libc", - "windows-targets 0.48.0", + "windows-targets", ] [[package]] @@ -3057,9 +3057,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" dependencies = [ "autocfg", "scopeguard", @@ -3217,9 +3217,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36506f2f935238463605f3bb13b362f1949daafc3b347d05d60ae08836db2bd2" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" dependencies = [ "async-io", "async-lock", @@ -3227,7 +3227,6 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "futures-util", - "num_cpus", "once_cell", "parking_lot 0.12.1", "quanta", @@ -3694,7 +3693,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", + "parking_lot_core 0.9.8", ] [[package]] @@ -3713,18 +3712,18 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.7" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "backtrace", "cfg-if", "libc", "petgraph", - "redox_syscall 0.2.16", + "redox_syscall 0.3.5", "smallvec", "thread-id", - "windows-sys 0.45.0", + "windows-targets", ] [[package]] @@ -5184,18 +5183,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", @@ -5776,15 +5775,16 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.5.0" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" +checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" dependencies = [ + "autocfg", "cfg-if", "fastrand", "redox_syscall 0.3.5", "rustix", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -6852,7 +6852,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets 0.48.0", + "windows-targets", ] [[package]] @@ -6870,37 +6870,13 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.0", -] - -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", + "windows-targets", ] [[package]] diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 52b17b61..38f103b0 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -10,5 +10,5 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.71" hashbrown = "0.14.0" log = "0.4.18" -moka = { version = "0.11.1", features = ["future"] } +moka = { version = "0.11.2", features = ["future"] } tokio = "1.28.2" diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 8ab86762..60a85ce8 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -11,7 +11,7 @@ path = "src/mod.rs" [dependencies] sea-orm = "0.11.3" -serde = "1.0.163" +serde = "1.0.164" uuid = "1.3.3" ethers = "2.0.7" ulid = "1.0.0" diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 0f3fbf98..a9448273 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" ewma = "0.1.1" flume = "0.10.14" log = "0.4.18" -serde = { version = "1.0.163", features = [] } +serde = { version = "1.0.164", features = [] } tokio = { version = "1.28.2", features = ["full"] } [dev-dependencies] diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 21058798..53f37001 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -67,7 +67,7 @@ itertools = "0.10.5" listenfd = "1.0.1" log = "0.4.18" mimalloc = { version = "0.1.37", optional = true} -moka = { version = "0.11.1", features = ["future"] } +moka = { version = "0.11.2", features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.18.0" } @@ -82,7 +82,7 @@ reqwest = { version = "0.11.18", default-features = false, features = ["deflate" rmp-serde = "1.1.1" rust_decimal = { version = "1.29.1", features = ["maths"] } sentry = { version = "0.31.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } -serde = { version = "1.0.163", features = [] } +serde = { version = "1.0.164", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.2" siwe = "0.5.2" From 1f3040f6c75283309b280275d838d00e4d4d6962 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 12:21:50 -0700 Subject: [PATCH 04/13] automated tiers --- Cargo.lock | 3 + docs/histograms.txt | 5 + web3_proxy/Cargo.toml | 2 + web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 2 - web3_proxy/src/config.rs | 7 - web3_proxy/src/errors.rs | 12 ++ web3_proxy/src/frontend/users/payment.rs | 2 - web3_proxy/src/rpcs/consensus.rs | 149 ++++++++++++++++---- web3_proxy/src/rpcs/many.rs | 38 ++--- web3_proxy/src/rpcs/one.rs | 37 +++-- 10 files changed, 184 insertions(+), 73 deletions(-) create mode 100644 docs/histograms.txt diff --git a/Cargo.lock b/Cargo.lock index 445589f6..4ef88d72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2440,6 +2440,7 @@ checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ "base64 0.13.1", "byteorder", + "crossbeam-channel", "flate2", "nom", "num-traits", @@ -6719,6 +6720,7 @@ dependencies = [ "axum", "axum-client-ip", "axum-macros", + "base64 0.21.2", "chrono", "console-subscriber", "counter", @@ -6737,6 +6739,7 @@ dependencies = [ "glob", "handlebars", "hashbrown 0.14.0", + "hdrhistogram", "hostname", "http", "hyper", diff --git a/docs/histograms.txt b/docs/histograms.txt new file mode 100644 index 00000000..65f57edb --- /dev/null +++ b/docs/histograms.txt @@ -0,0 +1,5 @@ +[2023-06-09T18:45:50Z WARN web3_proxy::rpcs::consensus] TODO: find the troughs in the histogram: HISTFAAAADV4nC3GoQ0AIAxE0eNAkRAEwbELQxE2QGG7aEeoaL95fz0ZACq8HKbwb/U5bGXystMAZl8EMw== + +Paste the HIST data here: + +Save the "Decoded histogram data" to a file and upload it here: diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 53f37001..91b4275e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -100,6 +100,8 @@ tracing-subscriber = "0.3" ulid = { version = "1.0.0", features = ["rand", "uuid", "serde"] } url = "2.4.0" uuid = { version = "1.3.3", default-features = false, features = ["fast-rng", "serde", "v4", "zerocopy"] } +hdrhistogram = "7.5.2" +base64 = "0.21.2" [dev-dependencies] tokio = { version = "1.28.2", features = ["full", "test-util"] } diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index a8ca67a0..f377bee0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -310,7 +310,6 @@ mod tests { Web3RpcConfig { http_url: Some(anvil.endpoint()), soft_limit: 100, - tier: 0, ..Default::default() }, ), @@ -319,7 +318,6 @@ mod tests { Web3RpcConfig { ws_url: Some(anvil.ws_endpoint()), soft_limit: 100, - tier: 0, ..Default::default() }, ), diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 7a0779a2..a6315fb1 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -256,9 +256,6 @@ pub struct Web3RpcConfig { /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs #[serde(default)] pub backup: bool, - /// All else equal, a server with a lower tier receives all requests - #[serde(default = "default_tier")] - pub tier: u64, /// Subscribe to the firehose of pending transactions /// Don't do this with free rpcs #[serde(default)] @@ -268,10 +265,6 @@ pub struct Web3RpcConfig { pub extra: HashMap, } -fn default_tier() -> u64 { - 0 -} - impl Web3RpcConfig { /// Create a Web3Rpc from config /// TODO: move this into Web3Rpc? (just need to make things pub(crate)) diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 7ac37da2..1a4678cb 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -61,6 +61,7 @@ pub enum Web3ProxyError { EthersWsClient(ethers::prelude::WsClientError), FlumeRecv(flume::RecvError), GasEstimateNotU256, + HdrRecord(hdrhistogram::errors::RecordError), Headers(headers::Error), HeaderToString(ToStrError), Hyper(hyper::Error), @@ -338,6 +339,17 @@ impl Web3ProxyError { }, ) } + Self::HdrRecord(err) => { + warn!("HdrRecord {:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: format!("{}", err).into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 60bb7e2d..3e7e6d54 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -13,7 +13,6 @@ use ethbloom::Input as BloomInput; use ethers::abi::AbiEncode; use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256}; use hashbrown::HashMap; -// use http::StatusCode; use http::StatusCode; use log::{debug, info, trace}; use migration::sea_orm::prelude::Decimal; @@ -21,7 +20,6 @@ use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait, }; use migration::{Expr, OnConflict}; -use num_traits::Pow; use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; use serde_json::json; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 4d54613f..81ba2d9a 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -3,27 +3,31 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; +use anyhow::Context; +use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; +use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; +use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{debug, log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::fmt; -use std::sync::Arc; +use std::sync::{atomic, Arc}; use tokio::time::Instant; #[derive(Clone, Serialize)] -struct RpcData { +struct ConsensusRpcData { head_block_num: U64, // TODO: this is too simple. erigon has 4 prune levels (hrct) oldest_block_num: U64, } -impl RpcData { +impl ConsensusRpcData { fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self { let head_block_num = *head.number(); @@ -45,13 +49,13 @@ impl RpcData { #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { - tier: u64, + tier: u8, backup: bool, head_num: Option, } impl RpcRanking { - pub fn add_offset(&self, offset: u64) -> Self { + pub fn add_offset(&self, offset: u8) -> Self { Self { tier: self.tier + offset, backup: self.backup, @@ -66,9 +70,10 @@ impl RpcRanking { } } - fn sort_key(&self) -> (u64, bool, Reverse>) { + fn sort_key(&self) -> (bool, u8, Reverse>) { // TODO: add soft_limit here? add peak_ewma here? - (self.tier, !self.backup, Reverse(self.head_num)) + // TODO: should backup or tier be checked first? now that tiers are automated, backups + (!self.backup, self.tier, Reverse(self.head_num)) } } @@ -94,9 +99,10 @@ pub enum ShouldWaitForBlock { /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our /status endpoint +/// TODO: one data structure of head_rpcs and other_rpcs that is sorted best first #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - pub(crate) tier: u64, + pub(crate) tier: u8, pub(crate) backups_needed: bool, // TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize @@ -112,7 +118,7 @@ pub struct ConsensusWeb3Rpcs { // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] - rpc_data: HashMap, RpcData>, + rpc_data: HashMap, ConsensusRpcData>, } impl ConsensusWeb3Rpcs { @@ -332,26 +338,20 @@ type FirstSeenCache = Cache; /// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers pub struct ConsensusFinder { - /// backups for all tiers are only used if necessary - /// `tiers[0] = only tier 0` - /// `tiers[1] = tier 0 and tier 1` - /// `tiers[n] = tier 0..=n` rpc_heads: HashMap, Web3ProxyBlock>, /// never serve blocks that are too old max_block_age: Option, /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag max_block_lag: Option, - /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups + /// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } impl ConsensusFinder { pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { // TODO: what's a good capacity for this? it shouldn't need to be very large - // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache let first_seen = Cache::new(16); - // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading let rpc_heads = HashMap::new(); Self { @@ -433,11 +433,100 @@ impl ConsensusFinder { Ok(changed) } + pub async fn update_tiers( + &mut self, + authorization: &Arc, + web3_rpcs: &Web3Rpcs, + ) -> Web3ProxyResult<()> { + match self.rpc_heads.len() { + 0 => {} + 1 => { + for rpc in self.rpc_heads.keys() { + rpc.tier.store(0, atomic::Ordering::Relaxed) + } + } + _ => { + // iterate first to find bounds + let mut min_latency = u64::MAX; + let mut max_latency = u64::MIN; + let mut weighted_latencies = HashMap::new(); + for rpc in self.rpc_heads.keys() { + let weighted_latency_seconds = rpc.weighted_peak_ewma_seconds(); + + let weighted_latency_ms = (weighted_latency_seconds * 1000.0).round() as i64; + + let weighted_latency_ms: u64 = weighted_latency_ms + .try_into() + .context("weighted_latency_ms does not fit in a u64")?; + + min_latency = min_latency.min(weighted_latency_ms); + max_latency = min_latency.max(weighted_latency_ms); + + weighted_latencies.insert(rpc, weighted_latency_ms); + } + + // // histogram requires high to be at least 2 x low + // // using min_latency for low does not work how we want it though + // max_latency = max_latency.max(2 * min_latency); + + // create the histogram + let mut hist = Histogram::::new_with_bounds(1, max_latency, 3).unwrap(); + + for weighted_latency_ms in weighted_latencies.values() { + hist.record(*weighted_latency_ms)?; + } + + // dev logging + if log_enabled!(Level::Trace) { + // print the histogram. see docs/histograms.txt for more info + let mut encoder = + base64::write::EncoderWriter::new(Vec::new(), &general_purpose::STANDARD); + + V2DeflateSerializer::new() + .serialize(&hist, &mut encoder) + .unwrap(); + + let encoded = encoder.finish().unwrap(); + + let encoded = String::from_utf8(encoded).unwrap(); + + trace!("weighted_latencies: {}", encoded); + } + + // TODO: get someone who is better at math to do something smarter + // this is not a very good use of stddev, but it works for now + let stddev = hist.stdev(); + + for (rpc, weighted_latency_ms) in weighted_latencies.into_iter() { + let tier = (weighted_latency_ms - min_latency) as f64 / stddev; + + let tier = tier.floor() as u64; + + let tier = tier.clamp(u8::MIN.into(), u8::MAX.into()) as u8; + + // TODO: this should be trace + trace!( + "{} - weighted_latency: {}ms, tier {}", + rpc, + weighted_latency_ms, + tier + ); + + rpc.tier.store(tier, atomic::Ordering::Relaxed); + } + } + } + + Ok(()) + } + pub async fn find_consensus_connections( &mut self, authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { + self.update_tiers(authorization, web3_rpcs).await?; + let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { @@ -477,13 +566,14 @@ impl ConsensusFinder { let mut backup_consensus = None; let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); - rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); + rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier.load(atomic::Ordering::Relaxed)); let current_tier = rpc_heads_by_tier .first() .expect("rpc_heads_by_tier should never be empty") .0 - .tier; + .tier + .load(atomic::Ordering::Relaxed); // trace!("first_tier: {}", current_tier); @@ -492,7 +582,9 @@ impl ConsensusFinder { // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() { - if current_tier != rpc.tier { + let rpc_tier = rpc.tier.load(atomic::Ordering::Relaxed); + + if current_tier != rpc_tier { // we finished processing a tier. check for primary results if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { trace!("found enough votes on tier {}", current_tier); @@ -600,7 +692,7 @@ impl ConsensusFinder { let tier = consensus_rpcs .iter() - .map(|x| x.tier) + .map(|x| x.tier.load(atomic::Ordering::Relaxed)) .max() .expect("there should always be a max"); @@ -615,7 +707,11 @@ impl ConsensusFinder { { let x_head_num = *x_head.number(); - let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num)); + let key: RpcRanking = RpcRanking::new( + x.tier.load(atomic::Ordering::Relaxed), + x.backup, + Some(x_head_num), + ); other_rpcs .entry(key) @@ -627,7 +723,7 @@ impl ConsensusFinder { let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len()); for (x, x_head) in self.rpc_heads.iter() { - let y = RpcData::new(x, x_head); + let y = ConsensusRpcData::new(x, x_head); rpc_data.insert(x.clone(), y); } @@ -647,8 +743,11 @@ impl ConsensusFinder { None } - pub fn worst_tier(&self) -> Option { - self.rpc_heads.iter().map(|(x, _)| x.tier).max() + pub fn worst_tier(&self) -> Option { + self.rpc_heads + .iter() + .map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed)) + .max() } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d6e8c030..92552b48 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -31,7 +31,7 @@ use serde_json::value::RawValue; use std::borrow::Cow; use std::cmp::{min_by_key, Reverse}; use std::fmt::{self, Display}; -use std::sync::atomic::Ordering; +use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; use tokio::select; @@ -440,7 +440,7 @@ impl Web3Rpcs { trace!("{} vs {}", rpc_a, rpc_b); // TODO: cached key to save a read lock // TODO: ties to the server with the smallest block_data_limit - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma()); + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_ewma_seconds()); trace!("winner: {}", faster_rpc); // add to the skip list in case this one fails @@ -1288,20 +1288,21 @@ impl Serialize for Web3Rpcs { /// TODO: should this be moved into a `impl Web3Rpc`? /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here -fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { +/// TODO: tests on this! +fn rpc_sync_status_sort_key(x: &Arc) -> (bool, Reverse, u8, OrderedFloat) { let head_block = x .head_block .as_ref() .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) .unwrap_or_default(); - let tier = x.tier; + let tier = x.tier.load(atomic::Ordering::Relaxed); - let peak_ewma = x.peak_ewma(); + let peak_ewma = x.weighted_peak_ewma_seconds(); let backup = x.backup; - (Reverse(head_block), tier, backup, peak_ewma) + (!backup, Reverse(head_block), tier, peak_ewma) } mod tests { @@ -1361,42 +1362,42 @@ mod tests { let mut rpcs: Vec<_> = [ Web3Rpc { name: "a".to_string(), - tier: 0, + // tier: 0, head_block: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), - tier: 0, + // tier: 0, head_block: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), - tier: 0, + // tier: 0, head_block: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), - tier: 1, + // tier: 1, head_block: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), - tier: 1, + // tier: 1, head_block: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), - tier: 1, + // tier: 1, head_block: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1410,6 +1411,7 @@ mod tests { let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); + // TODO: the tier refactor likely broke this assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); } @@ -1452,7 +1454,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, + // tier: 0, head_block: Some(tx_synced), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1466,7 +1468,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, + // tier: 0, head_block: Some(tx_lagged), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1736,7 +1738,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: 64.into(), - tier: 1, + // tier: 1, head_block: Some(tx_pruned), ..Default::default() }; @@ -1749,7 +1751,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), - tier: 2, + // tier: 2, head_block: Some(tx_archive), ..Default::default() }; @@ -1918,7 +1920,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: 64.into(), - tier: 0, + // tier: 0, head_block: Some(tx_mock_geth), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1930,7 +1932,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), - tier: 1, + // tier: 1, head_block: Some(tx_mock_erigon_archive), peak_latency: Some(new_peak_latency()), ..Default::default() diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index fcd8cdab..4a30b983 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -24,7 +24,7 @@ use serde::Serialize; use serde_json::json; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; @@ -54,8 +54,6 @@ pub struct Web3Rpc { pub backup: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, - /// Lower tiers are higher priority when sending requests - pub(super) tier: u64, /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) head_block: Option>>, @@ -64,6 +62,8 @@ pub struct Web3Rpc { /// Track peak request latency /// This is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, + /// Automatically set priority + pub(super) tier: AtomicU8, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, @@ -196,7 +196,6 @@ impl Web3Rpc { name, peak_latency: Some(peak_latency), soft_limit: config.soft_limit, - tier: config.tier, ws_provider, disconnect_watch: Some(disconnect_watch), ..Default::default() @@ -219,7 +218,7 @@ impl Web3Rpc { Ok((new_connection, handle)) } - pub fn peak_ewma(&self) -> OrderedFloat { + pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { peak_latency.latency().as_secs_f64() } else { @@ -538,7 +537,7 @@ impl Web3Rpc { // TODO: how often? different depending on the chain? // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though - let health_sleep_seconds = 10; + let health_sleep_seconds = 5; // health check loop let f = async move { @@ -550,7 +549,7 @@ impl Web3Rpc { while !rpc.should_disconnect() { new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); - if new_total_requests - old_total_requests < 10 { + if new_total_requests - old_total_requests < 5 { // TODO: if this fails too many times, reset the connection // TODO: move this into a function and the chaining should be easier if let Err(err) = rpc.healthcheck(error_handler).await { @@ -900,18 +899,20 @@ impl Web3Rpc { impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { - self.name.hash(state); - self.display_name.hash(state); - self.http_provider.as_ref().map(|x| x.url()).hash(state); - // TODO: figure out how to get the url for the provider - // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else - // self.ws_provider.map(|x| x.url()).hash(state); - self.automatic_block_limit.hash(state); + // do not include automatic block limit because it can change + // do not include tier because it can change self.backup.hash(state); + self.created_at.hash(state); + self.display_name.hash(state); + self.name.hash(state); + + // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else + self.http_provider.as_ref().map(|x| x.url()).hash(state); + // TODO: figure out how to get the url for the ws provider + // self.ws_provider.map(|x| x.url()).hash(state); + // TODO: don't include soft_limit if we change them to be dynamic self.soft_limit.hash(state); - self.tier.hash(state); - self.created_at.hash(state); } } @@ -988,7 +989,7 @@ impl Serialize for Web3Rpc { &self.peak_latency.as_ref().unwrap().latency().as_millis(), )?; - state.serialize_field("peak_ewma_s", self.peak_ewma().as_ref())?; + state.serialize_field("peak_ewma_s", self.weighted_peak_ewma_seconds().as_ref())?; state.end() } @@ -1047,7 +1048,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, head_block: Some(tx), ..Default::default() }; @@ -1082,7 +1082,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - tier: 0, head_block: Some(tx), ..Default::default() }; From bd87fcb13c0d8cea09e950298a65e199b16c5f52 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 13:09:58 -0700 Subject: [PATCH 05/13] move sort and shuffle for loadbalancing into proper functions --- web3_proxy/src/rpcs/consensus.rs | 12 ++--- web3_proxy/src/rpcs/many.rs | 84 +++++++++++++------------------- web3_proxy/src/rpcs/one.rs | 56 +++++++++++++++++++++ 3 files changed, 96 insertions(+), 56 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 81ba2d9a..483a76df 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -11,7 +11,7 @@ use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{debug, log_enabled, trace, warn, Level}; +use log::{log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; @@ -73,6 +73,8 @@ impl RpcRanking { fn sort_key(&self) -> (bool, u8, Reverse>) { // TODO: add soft_limit here? add peak_ewma here? // TODO: should backup or tier be checked first? now that tiers are automated, backups + // TODO: should we include a random number in here? + // TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency? (!self.backup, self.tier, Reverse(self.head_num)) } } @@ -433,11 +435,7 @@ impl ConsensusFinder { Ok(changed) } - pub async fn update_tiers( - &mut self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - ) -> Web3ProxyResult<()> { + pub async fn update_tiers(&mut self) -> Web3ProxyResult<()> { match self.rpc_heads.len() { 0 => {} 1 => { @@ -525,7 +523,7 @@ impl ConsensusFinder { authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { - self.update_tiers(authorization, web3_rpcs).await?; + self.update_tiers().await?; let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 92552b48..4644bac4 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -23,17 +23,15 @@ use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, CacheBuilder}; -use ordered_float::OrderedFloat; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; use std::borrow::Cow; -use std::cmp::{min_by_key, Reverse}; +use std::cmp::min_by_key; use std::fmt::{self, Display}; -use std::sync::atomic::{self, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use thread_fast_rng::rand::seq::SliceRandom; use tokio::select; use tokio::sync::{broadcast, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; @@ -523,8 +521,8 @@ impl Web3Rpcs { .cloned() .collect(); - // TODO: include tiers in this? - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + potential_rpcs + .sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed.copied())); match self ._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs) @@ -572,10 +570,11 @@ impl Web3Rpcs { .cloned(), ); - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - if potential_rpcs.len() >= self.min_head_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); match self ._best_available_rpc( @@ -604,8 +603,7 @@ impl Web3Rpcs { } for next_rpcs in consensus_rpcs.other_rpcs.values() { - // we have to collect in order to shuffle - let mut more_rpcs: Vec<_> = next_rpcs + let more_rpcs = next_rpcs .iter() .filter(|rpc| { consensus_rpcs.rpc_will_work_now( @@ -615,16 +613,16 @@ impl Web3Rpcs { rpc, ) }) - .cloned() - .collect(); + .cloned(); - // shuffle only the new entries. that way the highest tier still gets preference - more_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - - potential_rpcs.extend(more_rpcs.into_iter()); + potential_rpcs.extend(more_rpcs); if potential_rpcs.len() >= self.min_head_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -654,6 +652,10 @@ impl Web3Rpcs { if !potential_rpcs.is_empty() { // even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -760,14 +762,14 @@ impl Web3Rpcs { }; // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + synced_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("synced_rpcs: {:#?}", synced_rpcs); // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_rpcs isn't enough let mut all_rpcs: Vec<_> = self.by_name.load().values().cloned().collect(); - all_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("all_rpcs: {:#?}", all_rpcs); @@ -1284,27 +1286,6 @@ impl Serialize for Web3Rpcs { } } -/// sort by block number (descending) and tier (ascending) -/// TODO: should this be moved into a `impl Web3Rpc`? -/// TODO: i think we still have sorts scattered around the code that should use this -/// TODO: take AsRef or something like that? We don't need an Arc here -/// TODO: tests on this! -fn rpc_sync_status_sort_key(x: &Arc) -> (bool, Reverse, u8, OrderedFloat) { - let head_block = x - .head_block - .as_ref() - .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) - .unwrap_or_default(); - - let tier = x.tier.load(atomic::Ordering::Relaxed); - - let peak_ewma = x.weighted_peak_ewma_seconds(); - - let backup = x.backup; - - (!backup, Reverse(head_block), tier, peak_ewma) -} - mod tests { #![allow(unused_imports)] @@ -1327,8 +1308,14 @@ mod tests { PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_sort_connections_by_sync_status() { + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + let block_0 = Block { number: Some(0.into()), hash: Some(H256::random()), @@ -1362,42 +1349,42 @@ mod tests { let mut rpcs: Vec<_> = [ Web3Rpc { name: "a".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1407,12 +1394,11 @@ mod tests { .map(Arc::new) .collect(); - rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(None)); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); - // TODO: the tier refactor likely broke this - assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); + assert_eq!(names_in_sort_order, ["c", "b", "a", "f", "e", "d"]); } #[tokio::test] diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 4a30b983..dc07a732 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -22,10 +22,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; +use std::cmp::Reverse; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; +use thread_fast_rng::rand::Rng; use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -218,6 +220,60 @@ impl Web3Rpc { Ok((new_connection, handle)) } + /// sort by... + /// - backups last + /// - tier (ascending) + /// - block number (descending) + /// TODO: tests on this! + /// TODO: should tier or block number take priority? + /// TODO: should this return a struct that implements sorting traits? + fn sort_on(&self, max_block: Option) -> (bool, u8, Reverse) { + let mut head_block = self + .head_block + .as_ref() + .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) + .unwrap_or_default(); + + if let Some(max_block) = max_block { + head_block = head_block.min(max_block); + } + + let tier = self.tier.load(atomic::Ordering::Relaxed); + + let backup = self.backup; + + (!backup, tier, Reverse(head_block)) + } + + pub fn sort_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), OrderedFloat) { + let sort_on = self.sort_on(max_block); + + let weighted_peak_ewma_seconds = self.weighted_peak_ewma_seconds(); + + let x = (sort_on, weighted_peak_ewma_seconds); + + trace!("sort_for_load_balancing {}: {:?}", self, x); + + x + } + + /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds + pub fn shuffle_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), u32) { + let sort_on = self.sort_on(max_block); + + let mut rng = thread_fast_rng::thread_fast_rng(); + + let r = rng.gen::(); + + (sort_on, r) + } + pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { peak_latency.latency().as_secs_f64() From 8bf0ee473daf77ff53677f8a36cb5970b10f2979 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 13:30:00 -0700 Subject: [PATCH 06/13] loud log on rollback --- web3_proxy/src/rpcs/blockchain.rs | 10 +++++++--- web3_proxy/src/rpcs/consensus.rs | 9 ++++++--- web3_proxy/src/rpcs/many.rs | 20 ++++++++++---------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f1fe2e80..f8fda474 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -403,9 +403,11 @@ impl Web3Rpcs { let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); + let new_consensus_rpcs = Arc::new(new_consensus_rpcs); + let old_consensus_head_connections = self .watch_consensus_rpcs_sender - .send_replace(Some(Arc::new(new_consensus_rpcs))); + .send_replace(Some(new_consensus_rpcs.clone())); let backups_voted_str = if backups_needed { "B " } else { "" }; @@ -494,9 +496,9 @@ impl Web3Rpcs { } Ordering::Less => { // this is unlikely but possible - // TODO: better log + // TODO: better log that includes all the votes warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}. {:#?} -> {:#?}", consensus_tier, total_tiers, backups_voted_str, @@ -507,6 +509,8 @@ impl Web3Rpcs { old_head_block, rpc, rpc_head_str, + old_consensus_connections, + new_consensus_rpcs, ); if backups_needed { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 483a76df..1987b23c 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -470,6 +470,9 @@ impl ConsensusFinder { // create the histogram let mut hist = Histogram::::new_with_bounds(1, max_latency, 3).unwrap(); + // TODO: resize shouldn't be necessary, but i've seen it error + hist.auto(true); + for weighted_latency_ms in weighted_latencies.values() { hist.record(*weighted_latency_ms)?; } @@ -550,7 +553,7 @@ impl ConsensusFinder { let num_known = self.rpc_heads.len(); - if num_known < web3_rpcs.min_head_rpcs { + if num_known < web3_rpcs.min_synced_rpcs { // this keeps us from serving requests when the proxy first starts trace!("not enough servers known"); return Ok(None); @@ -671,7 +674,7 @@ impl ConsensusFinder { continue; } // TODO: different mins for backup vs primary - if rpc_names.len() < web3_rpcs.min_head_rpcs { + if rpc_names.len() < web3_rpcs.min_synced_rpcs { continue; } @@ -683,7 +686,7 @@ impl ConsensusFinder { .filter_map(|x| web3_rpcs.get(x)) .collect(); - if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { + if consensus_rpcs.len() < web3_rpcs.min_synced_rpcs { continue; } // consensus found! diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 4644bac4..5d0f2694 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -61,7 +61,7 @@ pub struct Web3Rpcs { /// blocks on the heaviest chain pub(super) blocks_by_number: BlocksByNumberCache, /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) - pub(super) min_head_rpcs: usize, + pub(super) min_synced_rpcs: usize, /// the soft limit required to agree on consensus for the head block. (thundering herd protection) pub(super) min_sum_soft_limit: u32, /// how far behind the highest known block height we can be before we stop serving requests @@ -121,7 +121,7 @@ impl Web3Rpcs { by_name, max_block_age, max_block_lag, - min_head_rpcs, + min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, pending_transaction_cache, @@ -254,10 +254,10 @@ impl Web3Rpcs { let num_rpcs = self.by_name.load().len(); - if num_rpcs < self.min_head_rpcs { + if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { num_known: num_rpcs, - min_head_rpcs: self.min_head_rpcs, + min_head_rpcs: self.min_synced_rpcs, }); } @@ -277,7 +277,7 @@ impl Web3Rpcs { } pub fn min_head_rpcs(&self) -> usize { - self.min_head_rpcs + self.min_synced_rpcs } /// subscribe to blocks and transactions from all the backend rpcs. @@ -570,7 +570,7 @@ impl Web3Rpcs { .cloned(), ); - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance potential_rpcs.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed.copied()) @@ -617,7 +617,7 @@ impl Web3Rpcs { potential_rpcs.extend(more_rpcs); - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance potential_rpcs.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed.copied()) @@ -1501,7 +1501,7 @@ mod tests { max_block_age: None, // TODO: test max_block_lag? max_block_lag: None, - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1777,7 +1777,7 @@ mod tests { blocks_by_number: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, max_block_lag: None, @@ -1957,7 +1957,7 @@ mod tests { pending_tx_id_sender, blocks_by_hash: Cache::new(10_000), blocks_by_number: Cache::new(10_000), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, max_block_lag: None, From bb900c61a5d493813dac30f8801fd4c899d4e44f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 13:31:46 -0700 Subject: [PATCH 07/13] todo --- web3_proxy/src/rpcs/consensus.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 1987b23c..ff9d2073 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -549,6 +549,7 @@ impl ConsensusFinder { let lowest_block_number = lowest_block.number().max(&max_lag_block_number); + // TODO: should lowest block number be set such that the rpc won't ever go backwards? trace!("safe lowest_block_number: {}", lowest_block_number); let num_known = self.rpc_heads.len(); From bcf4f335430d0b966f262c6b360bfdc6854eea2d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 14:02:29 -0700 Subject: [PATCH 08/13] add payment_factory_address to /status --- web3_proxy/src/frontend/status.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index f189301a..7d553061 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -145,6 +145,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "private_rpcs": app.private_rpcs, "rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache), "user_balance_cache": MokaCacheSerializer(&app.user_balance_cache), + "payment_factory_address": app.config.deposit_factory_contract, "version": APP_USER_AGENT, }); From 69dd8f00461cd1c424229ba2b268db3be88fa0f4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 15:39:14 -0700 Subject: [PATCH 09/13] checksum address and no need for topic --- config/development_polygon.toml | 2 +- config/example.toml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/config/development_polygon.toml b/config/development_polygon.toml index f6eb5743..65a34f21 100644 --- a/config/development_polygon.toml +++ b/config/development_polygon.toml @@ -8,7 +8,7 @@ db_max_connections = 20 # production runs inside docker and so uses "mysql://root:web3_proxy@db:3306/web3_proxy" for db_url db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" -deposit_factory_contract = "0x4e3bc2054788de923a04936c6addb99a05b0ea36" +deposit_factory_contract = "0x4e3BC2054788De923A04936C6ADdB99A05B0Ea36" deposit_topic = "0x45fdc265dc29885b9a485766b03e70978440d38c7c328ee0a14fa40c76c6af54" # a timeseries database is optional. it is used for making pretty graphs diff --git a/config/example.toml b/config/example.toml index ad6ac303..962dc9c4 100644 --- a/config/example.toml +++ b/config/example.toml @@ -11,8 +11,7 @@ db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" # read-only replica useful when running the proxy in multiple regions db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" -deposit_factory_contract = "0x4e3bc2054788de923a04936c6addb99a05b0ea36" -deposit_topic = "0x45fdc265dc29885b9a485766b03e70978440d38c7c328ee0a14fa40c76c6af54" +deposit_factory_contract = "0x4e3BC2054788De923A04936C6ADdB99A05B0Ea36" kafka_urls = "127.0.0.1:19092" kafka_protocol = "plaintext" From 41950c886c88e2671235910ea2759a229b6879ea Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 16:35:14 -0700 Subject: [PATCH 10/13] improvements for web3-this-then-that --- docs/http routes.txt | 7 +- web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 167 ++++++++++++----------- web3_proxy/src/frontend/users/payment.rs | 19 +-- web3_proxy/src/rpcs/blockchain.rs | 4 +- 5 files changed, 104 insertions(+), 95 deletions(-) diff --git a/docs/http routes.txt b/docs/http routes.txt index e6ab9a25..05d96490 100644 --- a/docs/http routes.txt +++ b/docs/http routes.txt @@ -111,11 +111,12 @@ GET /user/balance If valid, displays data about the user's balance and payments as JSON. POST /user/balance/:txid - Not yet implemented. Rate limited by IP. + Rate limited by IP. Checks the ":txid" for a transaction that updates a user's balance. The backend will be watching for these transactions, so this should not be needed in the common case. However, log susbcriptions are not perfect and so it might sometimes be needed. + Any authorized user can call this endpoint for any other user's transaction. GET /user/keys Checks the "AUTHORIZATION" header for a valid bearer token. @@ -141,10 +142,6 @@ GET /subuser/rpc_keys GET /user/deposits Retrieves the user's deposit history. -GET /user/balance/:tx_hash - Accepts a tx_hash and updates the user's balance according to this transaction. - Any authorized user can call this endpoint for any other user's transaction. - GET /user/referral Fetches a user's referral link. diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 9528a189..d5c3e9b5 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -172,7 +172,7 @@ pub async fn serve( .route("/user/deposits", get(users::payment::user_deposits_get)) .route( "/user/balance/:tx_hash", - get(users::payment::user_balance_post), + post(users::payment::user_balance_post), ) .route("/user/keys", get(users::rpc_keys::rpc_keys_get)) .route("/user/keys", post(users::rpc_keys::rpc_keys_management)) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index e597a9ab..dee85ee5 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -347,84 +347,97 @@ async fn handle_socket_payload( let response_id = json_request.id.clone(); // TODO: move this to a seperate function so we can use the try operator - let response: Web3ProxyResult = - match &json_request.method[..] { - "eth_subscribe" => { - // TODO: how can we subscribe with proxy_mode? - match app - .eth_subscribe( - authorization.clone(), - json_request, - subscription_count, - response_sender.clone(), - ) - .await - { - Ok((handle, response)) => { - { - let mut x = subscriptions.write().await; - - let result: &serde_json::value::RawValue = response - .result - .as_ref() - .context("there should be a result here")?; - - // TODO: there must be a better way to turn a RawValue - let k: U64 = serde_json::from_str(result.get()) - .context("subscription ids must be U64s")?; - - x.insert(k, handle); - }; - - Ok(response.into()) - } - Err(err) => Err(err), - } - } - "eth_unsubscribe" => { - let request_metadata = - RequestMetadata::new(&app, authorization.clone(), &json_request, None) - .await; - - #[derive(serde::Deserialize)] - struct EthUnsubscribeParams([U64; 1]); - - match serde_json::from_value(json_request.params) { - Ok::(params) => { - let subscription_id = ¶ms.0[0]; - - // TODO: is this the right response? - let partial_response = { - let mut x = subscriptions.write().await; - match x.remove(subscription_id) { - None => false, - Some(handle) => { - handle.abort(); - true - } - } - }; - - // TODO: don't create the response here. use a JsonRpcResponseData instead - let response = JsonRpcForwardedResponse::from_value( - json!(partial_response), - response_id.clone(), - ); - - request_metadata.add_response(&response); - - Ok(response.into()) - } - Err(err) => Err(Web3ProxyError::BadRequest( - f!("incorrect params given for eth_unsubscribe. {err:?}").into(), - )), - } - } - _ => app - .proxy_web3_rpc(authorization.clone(), json_request.into()) + let response: Web3ProxyResult = match &json_request.method + [..] + { + "eth_subscribe" => { + // TODO: how can we subscribe with proxy_mode? + match app + .eth_subscribe( + authorization.clone(), + json_request, + subscription_count, + response_sender.clone(), + ) .await - .map(|(_, response, _)| response), - }; + { + Ok((handle, response)) => { + { + let mut x = subscriptions.write().await; + + let result: &serde_json::value::RawValue = response + .result + .as_ref() + .context("there should be a result here")?; + + // TODO: there must be a better way to turn a RawValue + let k: U64 = serde_json::from_str(result.get()) + .context("subscription ids must be U64s")?; + + x.insert(k, handle); + }; + + Ok(response.into()) + } + Err(err) => Err(err), + } + } + "eth_unsubscribe" => { + let request_metadata = + RequestMetadata::new(&app, authorization.clone(), &json_request, None) + .await; + + let subscription_id: U64 = if json_request.params.is_array() { + if let Some(params) = json_request.params.get(0) { + serde_json::from_value(params.clone()).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + f!("no params for eth_unsubscribe").into(), + )); + } + } else if json_request.params.is_string() { + serde_json::from_value(json_request.params).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + "unexpected params given for eth_unsubscribe".into(), + )); + }; + + // TODO: is this the right response? + let partial_response = { + let mut x = subscriptions.write().await; + match x.remove(&subscription_id) { + None => false, + Some(handle) => { + handle.abort(); + true + } + } + }; + + // TODO: don't create the response here. use a JsonRpcResponseData instead + let response = JsonRpcForwardedResponse::from_value( + json!(partial_response), + response_id.clone(), + ); + + request_metadata.add_response(&response); + + Ok(response.into()) + } + _ => app + .proxy_web3_rpc(authorization.clone(), json_request.into()) + .await + .map(|(_, response, _)| response), + }; (response_id, response) } diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 3e7e6d54..52f12879 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,5 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::frontend::authorization::login_is_authorized; use anyhow::Context; use axum::{ extract::Path, @@ -7,6 +8,7 @@ use axum::{ response::IntoResponse, Extension, Json, TypedHeader, }; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user}; use ethbloom::Input as BloomInput; @@ -105,13 +107,14 @@ pub async fn user_deposits_get( #[debug_handler] pub async fn user_balance_post( Extension(app): Extension>, - TypedHeader(Authorization(bearer)): TypedHeader>, + InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, ) -> Web3ProxyResponse { // I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap" - // Check that the user is logged-in and authorized - // The semaphore keeps a user from submitting tons of transactions in parallel which would DOS our backends - let (_, _semaphore) = app.bearer_is_authorized(bearer).await?; + // we rate limit by ip instead of bearer token so transactions are easy to submit from scripts + // TODO: if ip is a 10. or a 172., allow unlimited + // TODO: why is login_is_authorized giving me a 403?! + // login_is_authorized(&app, ip).await?; // Get the transaction hash, and the amount that the user wants to top up by. // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) @@ -173,6 +176,7 @@ pub async fn user_balance_post( // check bloom filter to be sure this transaction contains any relevant logs // TODO: This does not work properly right now, get back this eventually + // TODO: compare to code in llamanodes/web3-this-then-that // if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract // .payment_received_filter() // .filter @@ -239,11 +243,8 @@ pub async fn user_balance_post( payment_token_amount.set_scale(payment_token_decimals)?; info!( - "Found deposit transaction for: {:?} {:?} {:?} {:?}", - &recipient_account.to_fixed_bytes(), - recipient_account, - payment_token_address, - payment_token_amount + "Found deposit transaction for: {:?} {:?} {:?}", + recipient_account, payment_token_address, payment_token_amount ); let recipient = match user::Entity::find() diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f8fda474..d8fd4255 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -498,7 +498,7 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log that includes all the votes warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}. {:#?} -> {:#?}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, @@ -509,8 +509,6 @@ impl Web3Rpcs { old_head_block, rpc, rpc_head_str, - old_consensus_connections, - new_consensus_rpcs, ); if backups_needed { From 71d3d63524ce72e5835e5b8d333fd9a8803862ca Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 16:35:44 -0700 Subject: [PATCH 11/13] put ip rate limiting back --- web3_proxy/src/frontend/users/payment.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 52f12879..f471d28d 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -113,8 +113,7 @@ pub async fn user_balance_post( // I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap" // we rate limit by ip instead of bearer token so transactions are easy to submit from scripts // TODO: if ip is a 10. or a 172., allow unlimited - // TODO: why is login_is_authorized giving me a 403?! - // login_is_authorized(&app, ip).await?; + login_is_authorized(&app, ip).await?; // Get the transaction hash, and the amount that the user wants to top up by. // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) From 4a9c1a0ce64655b3f10204be4e271d36ad868bbd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 18:31:47 -0700 Subject: [PATCH 12/13] move user registration into a dedicated function --- Cargo.lock | 12 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 13 +- web3_proxy/src/errors.rs | 154 +++++++++--------- web3_proxy/src/frontend/rpc_proxy_ws.rs | 6 +- .../src/frontend/users/authentication.rs | 102 ++++++------ web3_proxy/src/frontend/users/payment.rs | 12 +- web3_proxy/src/jsonrpc.rs | 13 +- web3_proxy/src/response_cache.rs | 8 +- web3_proxy/src/rpcs/many.rs | 3 +- web3_proxy/src/stats/mod.rs | 3 +- 11 files changed, 164 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ef88d72..def87e87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4107,9 +4107,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" dependencies = [ "unicode-ident", ] @@ -4304,9 +4304,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.31.0" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88383df3a85a38adfa2aa447d3ab6eb9cedcb49613adcf18e7e7ebb3b62e9b03" +checksum = "f8733bc5dc0b192d1a4b28073f9bff1326ad9e4fecd4d9b025d6fc358d1c3e79" dependencies = [ "futures-channel", "futures-util", @@ -4322,9 +4322,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.4.0+1.9.2" +version = "4.5.0+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249" +checksum = "1bb0676c2112342ac7165decdedbc4e7086c0af384479ccce534546b10687a5d" dependencies = [ "cmake", "libc", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 91b4275e..723bedf0 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -76,7 +76,7 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly", "serde"] } prettytable = "0.10.0" proctitle = "0.1.1" -rdkafka = { version = "0.31.0" } +rdkafka = { version = "0.32.2" } regex = "1.8.4" reqwest = { version = "0.11.18", default-features = false, features = ["deflate", "gzip", "json", "tokio-rustls"] } rmp-serde = "1.1.1" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 664c0c26..b8388567 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -49,7 +49,6 @@ use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRa use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::borrow::Cow; use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; @@ -1512,15 +1511,13 @@ impl Web3ProxyApp { JsonRpcResponseEnum::from(serde_json::Value::Bool(false)) } "eth_subscribe" => JsonRpcErrorData { - message: Cow::Borrowed( - "notifications not supported. eth_subscribe is only available over a websocket", - ), + message: "notifications not supported. eth_subscribe is only available over a websocket".into(), code: -32601, data: None, } .into(), "eth_unsubscribe" => JsonRpcErrorData { - message: Cow::Borrowed("notifications not supported. eth_unsubscribe is only available over a websocket"), + message: "notifications not supported. eth_unsubscribe is only available over a websocket".into(), code: -32601, data: None, }.into(), @@ -1547,7 +1544,7 @@ impl Web3ProxyApp { // TODO: what error code? // TODO: use Web3ProxyError::BadRequest JsonRpcErrorData { - message: Cow::Borrowed("Invalid request"), + message: "Invalid request".into(), code: -32600, data: None }.into() @@ -1575,7 +1572,7 @@ impl Web3ProxyApp { // TODO: this needs the correct error code in the response // TODO: Web3ProxyError::BadRequest instead? JsonRpcErrorData { - message: Cow::Borrowed("invalid request"), + message: "invalid request".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }.into() @@ -1583,7 +1580,7 @@ impl Web3ProxyApp { } } "test" => JsonRpcErrorData { - message: Cow::Borrowed("The method test does not exist/is not available."), + message: "The method test does not exist/is not available.".into(), code: -32601, data: None, }.into(), diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 1a4678cb..ecc0f66b 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -49,7 +49,7 @@ pub enum Web3ProxyError { BadRequest(Cow<'static, str>), #[error(ignore)] #[from(ignore)] - BadResponse(String), + BadResponse(Cow<'static, str>), BadRouting, Contract(ContractError), Database(DbErr), @@ -134,7 +134,7 @@ pub enum Web3ProxyError { SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] - StatusCode(StatusCode, String, Option), + StatusCode(StatusCode, Cow<'static, str>, Option), /// TODO: what should be attached to the timout? #[display(fmt = "{:?}", _0)] #[error(ignore)] @@ -153,7 +153,7 @@ pub enum Web3ProxyError { WebsocketOnly, #[display(fmt = "{:?}, {}", _0, _1)] #[error(ignore)] - WithContext(Option>, String), + WithContext(Option>, Cow<'static, str>), } impl Web3ProxyError { @@ -165,7 +165,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -177,7 +177,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Borrowed("FORBIDDEN"), + message: "FORBIDDEN".into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -189,7 +189,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -204,7 +204,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("bad request: {}", err)), + message: format!("bad request: {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -216,7 +216,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("bad response: {}", err)), + message: format!("bad response: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -227,7 +227,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("bad routing"), + message: "bad routing".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -238,7 +238,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("database error!"), + message: "database error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -249,7 +249,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("contract error: {}", err)), + message: format!("contract error: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -260,7 +260,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("decimal error: {}", err)), + message: format!("decimal error: {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -274,10 +274,11 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "both the primary and eip191 verification failed: {:#?}; {:#?}", err_1, err_191 - )), + ) + .into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -288,7 +289,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether http client error"), + message: "ether http client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -299,7 +300,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether provider error"), + message: "ether provider error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -310,7 +311,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("ether ws client error"), + message: "ether ws client error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -321,7 +322,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("flume recv error!"), + message: "flume recv error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -333,7 +334,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("gas estimate result is not an U256"), + message: "gas estimate result is not an U256".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -344,7 +345,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: format!("{}", err).into(), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -355,7 +356,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -367,7 +368,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose these error strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -379,7 +380,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("influxdb2 error!"), + message: "influxdb2 error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -390,10 +391,11 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "Invalid blocks bounds requested. min ({}) > max ({})", min, max - )), + ) + .into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -404,7 +406,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -415,7 +417,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("IP ({}) is not allowed!", ip)), + message: format!("IP ({}) is not allowed!", ip).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -426,7 +428,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -437,7 +439,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid message eip given"), + message: "invalid message eip given".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -448,7 +450,7 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("invalid invite code"), + message: "invalid invite code".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -460,7 +462,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose our io error strings? - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -471,7 +473,7 @@ impl Web3ProxyError { ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("invalid referral code"), + message: "invalid referral code".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -482,7 +484,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid referer!"), + message: "invalid referer!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -493,7 +495,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("invalid signature length"), + message: "invalid signature length".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -504,7 +506,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Borrowed("invalid user agent!"), + message: "invalid user agent!".into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -515,7 +517,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("UserKey was not a ULID or UUID"), + message: "UserKey was not a ULID or UUID".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -526,7 +528,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("UserTier is not valid!"), + message: "UserTier is not valid!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -545,7 +547,7 @@ impl Web3ProxyError { code, JsonRpcErrorData { // TODO: different messages of cancelled or not? - message: Cow::Borrowed("Unable to complete request"), + message: "Unable to complete request".into(), code: code.as_u16().into(), data: None, }, @@ -560,7 +562,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(format!("msgpack encode error: {}", err)), + message: format!("msgpack encode error: {}", err).into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -571,7 +573,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("Blocks here must have a number or hash"), + message: "Blocks here must have a number or hash".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -582,7 +584,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no blocks known"), + message: "no blocks known".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -593,7 +595,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no consensus head block"), + message: "no consensus head block".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -604,7 +606,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("unable to retry for request handle"), + message: "unable to retry for request handle".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -615,7 +617,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no servers synced"), + message: "no servers synced".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -629,10 +631,11 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "not enough rpcs connected {}/{}", num_known, min_head_rpcs - )), + ) + .into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -643,10 +646,11 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Owned(format!( + message: format!( "not enough soft limit available {}/{}", available, needed - )), + ) + .into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -658,7 +662,7 @@ impl Web3ProxyError { ( StatusCode::NOT_FOUND, JsonRpcErrorData { - message: Cow::Borrowed("not found!"), + message: "not found!".into(), code: StatusCode::NOT_FOUND.as_u16().into(), data: None, }, @@ -669,7 +673,7 @@ impl Web3ProxyError { ( StatusCode::NOT_IMPLEMENTED, JsonRpcErrorData { - message: Cow::Borrowed("work in progress"), + message: "work in progress".into(), code: StatusCode::NOT_IMPLEMENTED.as_u16().into(), data: None, }, @@ -680,7 +684,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("Origin required"), + message: "Origin required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -691,7 +695,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("Origin ({}) is not allowed!", origin)), + message: format!("Origin ({}) is not allowed!", origin).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -702,7 +706,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("parse bytes error!"), + message: "parse bytes error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -713,7 +717,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("parse message error!"), + message: "parse message error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -724,7 +728,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("unable to parse address"), + message: "unable to parse address".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -735,7 +739,7 @@ impl Web3ProxyError { ( StatusCode::PAYMENT_REQUIRED, JsonRpcErrorData { - message: Cow::Borrowed("Payment is required and user is not premium"), + message: "Payment is required and user is not premium".into(), code: StatusCode::PAYMENT_REQUIRED.as_u16().into(), data: None, }, @@ -767,7 +771,7 @@ impl Web3ProxyError { ( StatusCode::TOO_MANY_REQUESTS, JsonRpcErrorData { - message: Cow::Owned(msg), + message: msg.into(), code: StatusCode::TOO_MANY_REQUESTS.as_u16().into(), data: None, }, @@ -778,7 +782,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("redis error!"), + message: "redis error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -789,7 +793,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("Referer required"), + message: "Referer required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -800,7 +804,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("Referer ({:?}) is not allowed", referer)), + message: format!("Referer ({:?}) is not allowed", referer).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -812,7 +816,7 @@ impl Web3ProxyError { StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - message: Cow::Borrowed("semaphore acquire error"), + message: "semaphore acquire error".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -823,7 +827,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("error stat_sender sending response_stat"), + message: "error stat_sender sending response_stat".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -834,7 +838,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("de/serialization error! {}", err)), + message: format!("de/serialization error! {}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -852,7 +856,7 @@ impl Web3ProxyError { ( *status_code, JsonRpcErrorData { - message: err_msg.to_owned().into(), + message: err_msg.clone(), code: code.into(), data: None, }, @@ -904,7 +908,7 @@ impl Web3ProxyError { Self::UnknownKey => ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { - message: Cow::Borrowed("unknown api key!"), + message: "unknown api key!".into(), code: StatusCode::UNAUTHORIZED.as_u16().into(), data: None, }, @@ -914,7 +918,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("User agent required"), + message: "User agent required".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -925,7 +929,7 @@ impl Web3ProxyError { ( StatusCode::FORBIDDEN, JsonRpcErrorData { - message: Cow::Owned(format!("User agent ({}) is not allowed!", ua)), + message: format!("User agent ({}) is not allowed!", ua).into(), code: StatusCode::FORBIDDEN.as_u16().into(), data: None, }, @@ -937,7 +941,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("user ids should always be non-zero"), + message: "user ids should always be non-zero".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -948,7 +952,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed("verification error!"), + message: "verification error!".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -959,7 +963,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("watch recv error!"), + message: "watch recv error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -970,7 +974,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Borrowed("watch send error!"), + message: "watch send error!".into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -997,7 +1001,7 @@ impl Web3ProxyError { ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: msg.to_owned().into(), + message: msg.clone(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -1038,11 +1042,11 @@ impl IntoResponse for Web3ProxyError { } pub trait Web3ProxyErrorContext { - fn web3_context>(self, msg: S) -> Result; + fn web3_context>>(self, msg: S) -> Result; } impl Web3ProxyErrorContext for Option { - fn web3_context>(self, msg: S) -> Result { + fn web3_context>>(self, msg: S) -> Result { self.ok_or(Web3ProxyError::WithContext(None, msg.into())) } } @@ -1051,7 +1055,7 @@ impl Web3ProxyErrorContext for Result where E: Into, { - fn web3_context>(self, msg: S) -> Result { + fn web3_context>>(self, msg: S) -> Result { self.map_err(|err| Web3ProxyError::WithContext(Some(Box::new(err.into())), msg.into())) } } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index dee85ee5..cc8a3225 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -264,7 +264,7 @@ async fn _websocket_handler_with_key( ) { (None, None, _) => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, - "this page is for rpcs".to_string(), + "this page is for rpcs".into(), None, )), (Some(redirect_public_url), _, None) => { @@ -277,7 +277,7 @@ async fn _websocket_handler_with_key( // i don't think this is possible Err(Web3ProxyError::StatusCode( StatusCode::UNAUTHORIZED, - "AUTHORIZATION header required".to_string(), + "AUTHORIZATION header required".into(), None, )) } else { @@ -295,7 +295,7 @@ async fn _websocket_handler_with_key( // any other combinations get a simple error _ => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, - "this page is for rpcs".to_string(), + "this page is for rpcs".into(), None, )), } diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 44cfde21..c283d979 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -18,11 +18,11 @@ use entities::{balance, login, pending_login, referee, referrer, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; -use log::{debug, warn, trace}; +use log::{debug, trace, warn}; use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, - TransactionTrait, + self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + QueryFilter, TransactionTrait, }; use serde_json::json; use siwe::{Message, VerificationOpts}; @@ -143,6 +143,54 @@ pub async fn user_login_get( Ok(message.into_response()) } +pub async fn register_new_user( + db_conn: &DatabaseConnection, + address: Address, +) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> { + // all or nothing + let txn = db_conn.begin().await?; + + // the only thing we need from them is an address + // everything else is optional + // TODO: different invite codes should allow different levels + // TODO: maybe decrement a count on the invite code? + // TODO: There will be two different transactions. The first one inserts the user, the second one marks the user as being referred + let new_user = user::ActiveModel { + address: sea_orm::Set(address.to_fixed_bytes().into()), + ..Default::default() + }; + + let new_user = new_user.insert(&txn).await?; + + // create the user's first api key + let rpc_secret_key = RpcSecretKey::new(); + + let user_rpc_key = rpc_key::ActiveModel { + user_id: sea_orm::Set(new_user.id), + secret_key: sea_orm::Set(rpc_secret_key.into()), + description: sea_orm::Set(None), + ..Default::default() + }; + + let user_rpc_key = user_rpc_key + .insert(&txn) + .await + .web3_context("Failed saving new user key")?; + + // create an empty balance entry + let user_balance = balance::ActiveModel { + user_id: sea_orm::Set(new_user.id), + ..Default::default() + }; + + let user_balance = user_balance.insert(&txn).await?; + + // save the user and key and balance to the database + txn.commit().await?; + + Ok((new_user, user_rpc_key, user_balance)) +} + /// `POST /user/login` - Register or login by posting a signed "siwe" message. /// It is recommended to save the returned bearer token in a cookie. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. @@ -264,50 +312,8 @@ pub async fn user_login_post( } } - let txn = db_conn.begin().await?; - - // First add a user - - // the only thing we need from them is an address - // everything else is optional - // TODO: different invite codes should allow different levels - // TODO: maybe decrement a count on the invite code? - // TODO: There will be two different transactions. The first one inserts the user, the second one marks the user as being referred - let caller = user::ActiveModel { - address: sea_orm::Set(our_msg.address.into()), - ..Default::default() - }; - - let caller = caller.insert(&txn).await?; - - // create the user's first api key - let rpc_secret_key = RpcSecretKey::new(); - - let user_rpc_key = rpc_key::ActiveModel { - user_id: sea_orm::Set(caller.id), - secret_key: sea_orm::Set(rpc_secret_key.into()), - description: sea_orm::Set(None), - ..Default::default() - }; - - let user_rpc_key = user_rpc_key - .insert(&txn) - .await - .web3_context("Failed saving new user key")?; - - // We should also create the balance entry ... - let user_balance = balance::ActiveModel { - user_id: sea_orm::Set(caller.id), - ..Default::default() - }; - user_balance.insert(&txn).await?; - - let user_rpc_keys = vec![user_rpc_key]; - - // Also add a part for the invite code, i.e. who invited this guy - - // save the user and key to the database - txn.commit().await?; + let (caller, caller_key, _) = + register_new_user(&db_conn, our_msg.address.into()).await?; let txn = db_conn.begin().await?; // First, optionally catch a referral code from the parameters if there is any @@ -336,7 +342,7 @@ pub async fn user_login_post( } txn.commit().await?; - (caller, user_rpc_keys, StatusCode::CREATED) + (caller, vec![caller_key], StatusCode::CREATED) } Some(caller) => { // Let's say that a user that exists can actually also redeem a key in retrospect... diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index f471d28d..cd5a9a33 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,6 +1,7 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::frontend::authorization::login_is_authorized; +use crate::frontend::users::authentication::register_new_user; use anyhow::Context; use axum::{ extract::Path, @@ -169,7 +170,7 @@ pub async fn user_balance_post( PaymentFactory::new(payment_factory_address, app.internal_provider().clone()); debug!( - "Payment Factor Filter is: {:?}", + "Payment Factory Filter: {:?}", payment_factory_contract.payment_received_filter() ); @@ -251,12 +252,13 @@ pub async fn user_balance_post( .one(&db_conn) .await? { - Some(x) => Ok(x), + Some(x) => x, None => { - // todo!("make their account"); - Err(Web3ProxyError::AccessDenied) + let (user, _, _) = register_new_user(&db_conn, recipient_account).await?; + + user } - }?; + }; // For now we only accept stablecoins // And we hardcode the peg (later we would have to depeg this, for example diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 9b67c508..42fcc2ae 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -211,7 +211,7 @@ impl From<&'static str> for JsonRpcErrorData { fn from(value: &'static str) -> Self { Self { code: -32000, - message: Cow::Borrowed(value), + message: value.into(), data: None, } } @@ -221,7 +221,7 @@ impl From for JsonRpcErrorData { fn from(value: String) -> Self { Self { code: -32000, - message: Cow::Owned(value), + message: value.into(), data: None, } } @@ -273,7 +273,7 @@ impl JsonRpcForwardedResponse { result: None, error: Some(JsonRpcErrorData { code: code.unwrap_or(-32099), - message: Cow::Owned(message), + message: message.into(), // TODO: accept data as an argument data: None, }), @@ -319,10 +319,7 @@ impl JsonRpcForwardedResponse { data = err.data.clone(); } else if let Some(err) = err.as_serde_error() { // this is not an rpc error. keep it as an error - return Err(Web3ProxyError::BadResponse(format!( - "bad response: {}", - err - ))); + return Err(Web3ProxyError::BadResponse(err.to_string().into())); } else { return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); } @@ -336,7 +333,7 @@ impl JsonRpcForwardedResponse { result: None, error: Some(JsonRpcErrorData { code, - message: Cow::Owned(message), + message: message.into(), data, }), }) diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index f9541672..3e68267c 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -5,7 +5,6 @@ use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; use serde_json::value::RawValue; use std::{ - borrow::Cow, hash::{BuildHasher, Hash, Hasher}, sync::Arc, }; @@ -211,10 +210,7 @@ impl TryFrom for JsonRpcErrorData { data = err.data.clone(); } else if let Some(err) = err.as_serde_error() { // this is not an rpc error. keep it as an error - return Err(Web3ProxyError::BadResponse(format!( - "bad response: {}", - err - ))); + return Err(Web3ProxyError::BadResponse(err.to_string().into())); } else { return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); } @@ -224,7 +220,7 @@ impl TryFrom for JsonRpcErrorData { Ok(JsonRpcErrorData { code, - message: Cow::Owned(message), + message: message.into(), data, }) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5d0f2694..f47da3a7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -27,7 +27,6 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::borrow::Cow; use std::cmp::min_by_key; use std::fmt::{self, Display}; use std::sync::atomic::Ordering; @@ -1079,7 +1078,7 @@ impl Web3Rpcs { // TODO: what error code? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} Err(JsonRpcErrorData { - message: Cow::Borrowed("Requested data is not available"), + message: "Requested data is not available".into(), code: -32043, data: None, } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 6409aead..8b1740f2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -605,8 +605,7 @@ impl BufferedRpcQueryStats { // ================== let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) { Err(_) => Err(Web3ProxyError::BadResponse( - "Balance is not positive, although it was previously checked to be as such!" - .to_string(), + "Balance is not positive, although it was previously checked to be as such!".into(), )), // We don't do an get_or_insert, because technically we don't have the most up to date balance // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it From 47bece58656e54c4394a17894689b9ab0bffca0a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 18:56:13 -0700 Subject: [PATCH 13/13] newer nightly --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 004f03e8..75af82c2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2023-05-25" +channel = "nightly-2023-06-09"