From 8481f6d44c71cf8482eb49517e251c531589af78 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 21 Sep 2022 23:50:55 +0000 Subject: [PATCH] wip --- Cargo.lock | 31 ++++- README.md | 6 + TODO.md | 23 ++-- migration/Cargo.toml | 4 +- migration/src/lib.rs | 4 +- .../src/m20220101_000001_create_table.rs | 2 +- migration/src/m20220921_181610_log_reverts.rs | 118 ++++++++++++++++++ web3_proxy/Cargo.toml | 1 + web3_proxy/src/app.rs | 32 ++--- web3_proxy/src/block_number.rs | 54 +++++--- web3_proxy/src/config.rs | 1 - web3_proxy/src/frontend/errors.rs | 1 - web3_proxy/src/frontend/http.rs | 3 - web3_proxy/src/frontend/rpc_proxy_http.rs | 29 ++--- web3_proxy/src/metrics_frontend.rs | 2 - web3_proxy/src/rpcs/connection.rs | 6 - web3_proxy/src/rpcs/connections.rs | 16 ++- web3_proxy/src/rpcs/request.rs | 31 ++++- 18 files changed, 269 insertions(+), 95 deletions(-) create mode 100644 migration/src/m20220921_181610_log_reverts.rs diff --git a/Cargo.lock b/Cargo.lock index c2b64ae6..3c840fe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1049,6 +1049,17 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb4a24b1aaf0fd0ce8b45161144d6f42cd91677fd5940fd431183eb023b3a2b8" +[[package]] +name = "cookie" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05" +dependencies = [ + "percent-encoding", + "time 0.3.14", + "version_check", +] + [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -2782,7 +2793,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-std", "sea-orm-migration", @@ -5083,6 +5094,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-cookies" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19833e336396f3953e5ab1513d72b5e5ea51d5ad39b78d306766a05740b48b97" +dependencies = [ + "async-trait", + "axum-core", + "cookie", + "futures-util", + "http", + "parking_lot 0.12.1", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.3.4" @@ -5575,6 +5603,7 @@ dependencies = [ "tokio-stream", "toml", "tower", + "tower-cookies", "tower-http", "tower-request-id", "tracing", diff --git a/README.md b/README.md index 28015e0f..9513ee5d 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,12 @@ cargo run --release -- --config ./config/example.toml ## Common commands +Create a user: + +``` +cargo run --bin web3_proxy_cli -- --db-url "$YOUR_DB_URL" create_user --address "$USER_ADDRESS_0x" +``` + Check that the proxy is working: ``` diff --git a/TODO.md b/TODO.md index b81646a7..fde4bd48 100644 --- a/TODO.md +++ b/TODO.md @@ -121,8 +121,7 @@ These are roughly in order of completition - [x] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check - i think there is also a bug because i've seen "server not synced" a couple times - [x] bug around eth_getBlockByHash sometimes causes tokio to lock up - - i keep a mapping of blocks so that i can go from hash -> block. it has some consistent hashing it does to split them up across multiple maps each with their own lock. so a lot of the time reads dont block writes because they are in different internal maps. this was fine. - - but after changing my fork detection logic to use the same rules as erigon, i discovered that when you get blocks from a websocket subscription in erigon and geth, theres a missing field (https://github.com/ledgerwatch/erigon/issues/5190). so i added a query to get the block that includes the missing field. + - i keep a mapping of blocks so that i can go from hash -> block. it has some consistent hashing it does to split them up across multiple maps each with their own lock. so a lot of the time reads dont block writes because they are in different internal maps. this was fine. but after changing my fork detection logic to use the same rules as erigon, i discovered that when you get blocks from a websocket subscription in erigon and geth, theres a missing field (https://github.com/ledgerwatch/erigon/issues/5190). so i added a query to get the block that includes the missing field. - but i did this in a way where i was holding the write lock open while doing the query. the "new" block that has the missing field ends up in the same bucket and it also wants a write lock. oops. entry api has very sharp edges. don't ever await inside a match on DashMap::entry - [x] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better. - [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions. @@ -156,18 +155,24 @@ These are roughly in order of completition - erigon gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"` - [x] database migration to change user_keys.requests_per_minute to bigunsigned (max of 18446744073709551615) - [x] change user creation script to have a "unlimited requests per minute" flag that sets it to u64::MAX (18446744073709551615) -- [ ] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user. let them choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly +- [x] in /status, block hashes has a lower count than block numbers. how is that possible? + - we weren't calling sync. now we are +- [-] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user. + - [-] let them choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in or spawned since it will slow things down and will make their calls less private -- [-] add configurable size limits to all the Caches -- [ ] Api keys need option to lock to IP, cors header, referer, etc +- [-] Api keys need option to lock to IP, cors header, referer, etc - [ ] active requests per second per api key - [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.) -- [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info? +- [-] add configurable size limits to all the Caches - [ ] Ulid instead of Uuid for user keys - - since users are actively using our service, we will need to support both - [ ] Ulid instead of Uuid for database ids - might have to use Uuid in sea-orm and then convert to Ulid on display +- [ ] bearer tokens should expire +- [-] signed cookie jar +- [ ] user login should return both the bearer token and a jwt (jsonwebtoken rust crate should make it easy) +- [ ] /user/logout to clear bearer token and jwt ## V1 @@ -366,4 +371,8 @@ in another repo: event subscriber - i wish i had more logs. its possible that 15479605 came immediatly after - [ ] ip blocking logs a warn. we don't need that. a stat at most - [ ] keep it working without redis and a database -- [ ] in /status, block hashes has a lower count than block numbers. how is that possible? +- [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info? +- [ ] better handling for offline http servers + - if we get a connection refused, we should remove the server's block info so it is taken out of rotation +- [ ] web3_proxy_cli command should read database settings from config +- [ ] how should we handle reverting transactions? they won't confirm for a while after we send them diff --git a/migration/Cargo.toml b/migration/Cargo.toml index a8f5ede0..ee937f96 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.1.0" +version = "0.2.0" edition = "2021" publish = false @@ -16,6 +16,6 @@ version = "0.9.2" features = [ # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. - "runtime-tokio-rustls", # `ASYNC_RUNTIME` featrure + "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature "sqlx-mysql", # `DATABASE_DRIVER` feature ] diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 6e016ca9..6780b8e9 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -1,7 +1,7 @@ pub use sea_orm_migration::prelude::*; -pub mod m20220101_000001_create_table; -pub mod m20220921_181610_log_reverts; +mod m20220101_000001_create_table; +mod m20220921_181610_log_reverts; pub struct Migrator; diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index 89cbee8d..364446a1 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -174,7 +174,7 @@ enum SecondaryUser { -- TODO: more security features. likely similar to infura */ #[derive(Iden)] -pub enum UserKeys { +enum UserKeys { Table, Id, UserId, diff --git a/migration/src/m20220921_181610_log_reverts.rs b/migration/src/m20220921_181610_log_reverts.rs new file mode 100644 index 00000000..606ccb53 --- /dev/null +++ b/migration/src/m20220921_181610_log_reverts.rs @@ -0,0 +1,118 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // add some fields to the UserKeys table + manager + .alter_table( + sea_query::Table::alter() + .table(UserKeys::Table) + .to_owned() + // change requests per minute to a big_unsigned + .modify_column( + ColumnDef::new(UserKeys::RequestsPerMinute) + .big_unsigned() + .not_null(), + ) + // add a column for logging reverts in the RevertLogs table + .add_column( + ColumnDef::new(UserKeys::LogReverts) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await?; + + // create a table for logging reverting eth_call and eth_estimateGas + manager + .create_table( + Table::create() + .table(RevertLogs::Table) + .col( + ColumnDef::new(RevertLogs::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(RevertLogs::UserKeyId) + .big_unsigned() + .not_null(), + ) + .col(ColumnDef::new(RevertLogs::Timestamp).timestamp().not_null()) + .col( + ColumnDef::new(RevertLogs::Method) + .enumeration( + "method", + ["eth_call", "eth_estimateGas", "eth_sendRawTransaction"], + ) + .not_null(), + ) + .col(ColumnDef::new(RevertLogs::To).binary_len(20).not_null()) + .col(ColumnDef::new(RevertLogs::CallData).text().not_null()) + .index(sea_query::Index::create().col(RevertLogs::To)) + .foreign_key( + sea_query::ForeignKey::create() + .from(RevertLogs::Table, RevertLogs::UserKeyId) + .to(UserKeys::Table, UserKeys::Id), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // drop the new table + manager + .drop_table(Table::drop().table(RevertLogs::Table).to_owned()) + .await?; + + // put the UserKeys back to how it was before our migrations + manager + .alter_table( + sea_query::Table::alter() + .table(UserKeys::Table) + .to_owned() + .modify_column( + ColumnDef::new(UserKeys::RequestsPerMinute) + .unsigned() + .not_null(), + ) + .drop_column(UserKeys::LogReverts) + .to_owned(), + ) + .await + } +} + +// copied from create_table.rs, but added +#[derive(Iden)] +pub enum UserKeys { + Table, + Id, + // UserId, + // ApiKey, + // Description, + // PrivateTxs, + // Active, + RequestsPerMinute, + LogReverts, +} + +#[derive(Iden)] +enum RevertLogs { + Table, + Id, + UserKeyId, + Method, + CallData, + To, + Timestamp, +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4b8fcbdb..d246b498 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -59,6 +59,7 @@ time = "0.3.14" tokio = { version = "1.21.1", features = ["full", "tracing"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.10", features = ["sync"] } +tower-cookies = "0.7" toml = "0.5.9" tower = "0.4.13" tower-request-id = "0.2.0" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 1381e48a..a2200bdf 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -119,7 +119,6 @@ pub async fn flatten_handles( } /// Connect to the database and run migrations -#[instrument(skip_all)] pub async fn get_migrated_db( db_url: String, min_connections: u32, @@ -316,6 +315,8 @@ impl Web3ProxyApp { Some(private_rpcs) }; + // TODO: setup a channel here for receiving influxdb stats + let mut frontend_ip_rate_limiter = None; let mut frontend_key_rate_limiter = None; if let Some(redis_pool) = redis_pool.as_ref() { @@ -347,6 +348,7 @@ impl Web3ProxyApp { // all the users are the same size, so no need for a weigher // TODO: max_capacity from config + // TODO: ttl from config let user_cache = Cache::builder() .max_capacity(10_000) .time_to_live(Duration::from_secs(60)) @@ -396,7 +398,6 @@ impl Web3ProxyApp { .expect("prometheus metrics should always serialize") } - #[instrument(skip_all)] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] pub async fn eth_subscribe<'a>( self: &'a Arc, @@ -591,7 +592,6 @@ impl Web3ProxyApp { } /// send the request or batch of requests to the approriate RPCs - #[instrument(skip_all)] pub async fn proxy_web3_rpc( self: &Arc, request: JsonRpcRequestEnum, @@ -621,7 +621,6 @@ impl Web3ProxyApp { /// cut up the request and send to potentually different servers /// TODO: make sure this isn't a problem - #[instrument(skip_all)] async fn proxy_web3_rpc_requests( self: &Arc, requests: Vec, @@ -645,7 +644,6 @@ impl Web3ProxyApp { Ok(collected) } - #[instrument(skip_all)] pub async fn redis_conn(&self) -> anyhow::Result { match self.redis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), @@ -658,7 +656,6 @@ impl Web3ProxyApp { } #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] - #[instrument(skip_all)] async fn proxy_web3_rpc_request( self: &Arc, mut request: JsonRpcRequest, @@ -670,14 +667,7 @@ impl Web3ProxyApp { let request_id = request.id.clone(); // TODO: if eth_chainId or net_version, serve those without querying the backend - - // TODO: how much should we retry? probably with a timeout and not with a count like this - // TODO: think more about this loop. - // TODO: add things to this span - let span = info_span!("rpc_request"); - // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - - // TODO: don't clone + // TODO: don't clone? let partial_response: serde_json::Value = match request.method.clone().as_ref() { // lots of commands are blocked method @ ("admin_addPeer" @@ -800,10 +790,7 @@ impl Web3ProxyApp { // emit stats let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); - return rpcs - .try_send_all_upstream_servers(request, None) - .instrument(span) - .await; + return rpcs.try_send_all_upstream_servers(request, None).await; } "eth_syncing" => { // no stats on this. its cheap @@ -862,8 +849,13 @@ impl Web3ProxyApp { // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different - let request_block_id = if let Some(request_block_needed) = - block_needed(method, request.params.as_mut(), head_block_id.num) + let request_block_id = if let Some(request_block_needed) = block_needed( + method, + request.params.as_mut(), + head_block_id.num, + &self.balanced_rpcs, + ) + .await { // TODO: maybe this should be on the app and not on balanced_rpcs let request_block_hash = diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index dc734172..1ee2e9e5 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,10 +1,13 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. +use anyhow::Context; use ethers::{ prelude::{BlockNumber, U64}, types::H256, }; use tracing::warn; +use crate::rpcs::connections::Web3Connections; + pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) { match block_num { BlockNumber::Earliest => { @@ -29,10 +32,11 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64 } /// modify params to always have a block number and not "latest" -pub fn clean_block_number( +pub async fn clean_block_number( params: &mut serde_json::Value, block_param_id: usize, latest_block: U64, + rpcs: &Web3Connections, ) -> anyhow::Result { match params.as_array_mut() { None => { @@ -53,20 +57,33 @@ pub fn clean_block_number( } Some(x) => { // convert the json value to a BlockNumber - // TODO: this is wrong, it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` - let block_num = if let Some(obj) = x.as_object_mut() { + let (modified, block_num) = if let Some(obj) = x.as_object_mut() { + // it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` if let Some(block_hash) = obj.remove("blockHash") { - let block_hash: H256 = serde_json::from_value(block_hash)?; + let block_hash: H256 = + serde_json::from_value(block_hash).context("decoding blockHash")?; - todo!("look up the block_hash from our cache"); + let block = rpcs.block(&block_hash, None).await?; + + let block_num = block + .number + .expect("blocks here should always have numbers"); + + // always set modfied to true because we used "take" above + (true, block_num) } else { - unimplemented!(); + return Err(anyhow::anyhow!("blockHash missing")); } } else { - serde_json::from_value::(x.take())? - }; + // it might be a string like "latest" or a block number + // TODO: "BlockNumber" needs a better name + let block_number = serde_json::from_value::(x.take())?; - let (modified, block_num) = block_num_to_u64(block_num, latest_block); + let (_, block_num) = block_num_to_u64(block_number, latest_block); + + // always set modfied to true because we used "take" above + (true, block_num) + }; // if we changed "latest" to a number, update the params to match if modified { @@ -79,11 +96,12 @@ pub fn clean_block_number( } } -// TODO: change this to also return the hash needed -pub fn block_needed( +// TODO: change this to also return the hash needed? +pub async fn block_needed( method: &str, params: Option<&mut serde_json::Value>, head_block_num: U64, + rpcs: &Web3Connections, ) -> Option { // if no params, no block is needed let params = params?; @@ -121,6 +139,8 @@ pub fn block_needed( *x = serde_json::to_value(block_num).unwrap(); } + // TODO: maybe don't return. instead check toBlock too? + // TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both! return Some(block_num); } @@ -136,13 +156,11 @@ pub fn block_needed( return Some(block_num); } - if let Some(x) = obj.get("blockHash") { - // TODO: check a Cache of recent hashes - // TODO: error if fromBlock or toBlock were set - todo!("handle blockHash {}", x); + if obj.contains_key("blockHash") { + 1 + } else { + return None; } - - return None; } "eth_getStorageAt" => 2, "eth_getTransactionByHash" => { @@ -180,7 +198,7 @@ pub fn block_needed( } }; - match clean_block_number(params, block_param_id, head_block_num) { + match clean_block_number(params, block_param_id, head_block_num, rpcs).await { Ok(block) => Some(block), Err(err) => { // TODO: seems unlikely that we will get here diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5ae23238..3faa3e41 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -113,7 +113,6 @@ impl Web3ConnectionConfig { /// Create a Web3Connection from config /// TODO: move this into Web3Connection (just need to make things pub(crate)) #[allow(clippy::too_many_arguments)] - #[instrument(skip_all)] pub async fn spawn( self, name: String, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index c0194f9d..fb59767b 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -133,7 +133,6 @@ impl IntoResponse for FrontendErrorResponse { } } -#[instrument(skip_all)] pub async fn handler_404() -> Response { FrontendErrorResponse::NotFound.into_response() } diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 3a1714a9..98b7fc21 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use tracing::instrument; /// Health check page for load balancers to use -#[instrument(skip_all)] pub async fn health(Extension(app): Extension>) -> impl IntoResponse { // TODO: also check that the head block is not too old if app.balanced_rpcs.synced() { @@ -18,14 +17,12 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// Prometheus metrics /// TODO: when done debugging, remove this and only allow access on a different port -#[instrument(skip_all)] pub async fn prometheus(Extension(app): Extension>) -> impl IntoResponse { app.prometheus_metrics() } /// Very basic status page /// TODO: replace this with proper stats and monitoring -#[instrument(skip_all)] pub async fn status(Extension(app): Extension>) -> impl IntoResponse { app.pending_transactions.sync(); app.user_cache.sync(); diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 3ff513e9..e1a064b3 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -12,26 +12,18 @@ use uuid::Uuid; pub async fn public_proxy_web3_rpc( Extension(app): Extension>, - Host(host): Host, ClientIp(ip): ClientIp, Json(payload): Json, referer: Option>, user_agent: Option>, ) -> FrontendResult { - let request_span = debug_span!("request", host, ?referer, ?user_agent); + let request_span = error_span!("request", %ip, ?referer, ?user_agent); let ip = rate_limit_by_ip(&app, ip) .instrument(request_span.clone()) .await?; - let user_span = error_span!("ip", %ip); - - let f = tokio::spawn(async move { - app.proxy_web3_rpc(payload) - .instrument(request_span) - .instrument(user_span) - .await - }); + let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); let response = f.await.unwrap()?; @@ -40,26 +32,23 @@ pub async fn public_proxy_web3_rpc( pub async fn user_proxy_web3_rpc( Extension(app): Extension>, - Host(host): Host, + ClientIp(ip): ClientIp, Json(payload): Json, + referer: Option>, user_agent: Option>, Path(user_key): Path, - referer: Option>, ) -> FrontendResult { - let request_span = debug_span!("request", host, ?referer, ?user_agent); + let request_span = + error_span!("request", %ip, ?referer, ?user_agent, user_id = tracing::field::Empty); + // TODO: this should probably return the user_key_id instead? or maybe both? let user_id = rate_limit_by_key(&app, user_key) .instrument(request_span.clone()) .await?; - let user_span = error_span!("user", user_id); + request_span.record("user_id", user_id); - let f = tokio::spawn(async move { - app.proxy_web3_rpc(payload) - .instrument(request_span) - .instrument(user_span) - .await - }); + let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); let response = f.await.unwrap()?; diff --git a/web3_proxy/src/metrics_frontend.rs b/web3_proxy/src/metrics_frontend.rs index fbf01786..9056c651 100644 --- a/web3_proxy/src/metrics_frontend.rs +++ b/web3_proxy/src/metrics_frontend.rs @@ -9,7 +9,6 @@ use tracing::{info, instrument}; use crate::app::Web3ProxyApp; /// Run a prometheus metrics server on the given port. -#[instrument(skip_all)] pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { // build our application with a route // order most to least common @@ -42,7 +41,6 @@ pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { .map_err(Into::into) } -#[instrument(skip_all)] async fn root(Extension(app): Extension>) -> Response { let serialized = app.prometheus_metrics(); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index cc6bbd17..a2507388 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -174,7 +174,6 @@ impl Web3Connection { Ok((new_connection, handle)) } - #[instrument(skip_all)] async fn check_block_data_limit(self: &Arc) -> anyhow::Result> { let mut limit = None; @@ -301,7 +300,6 @@ impl Web3Connection { } /// reconnect a websocket provider - #[instrument(skip_all)] pub async fn reconnect( self: &Arc, // websocket doesn't need the http client @@ -363,7 +361,6 @@ impl Web3Connection { self.provider.read().await.is_some() } - #[instrument(skip_all)] async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, @@ -426,7 +423,6 @@ impl Web3Connection { } /// subscribe to blocks and transactions with automatic reconnects - #[instrument(skip_all)] async fn subscribe( self: Arc, http_interval_sender: Option>>, @@ -513,7 +509,6 @@ impl Web3Connection { } /// Subscribe to new blocks. If `reconnect` is true, this runs forever. - #[instrument(skip_all)] async fn subscribe_new_heads( self: Arc, http_interval_receiver: Option>, @@ -663,7 +658,6 @@ impl Web3Connection { Ok(()) } - #[instrument(skip_all)] async fn subscribe_pending_transactions( self: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 75f5a00d..eccbbebc 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -17,7 +17,7 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use moka::future::Cache; +use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; @@ -302,7 +302,6 @@ impl Web3Connections { } /// Send the same request to all the handles. Returning the most common success or most common error. - #[instrument(skip_all)] pub async fn try_send_parallel_requests( &self, active_request_handles: Vec, @@ -361,7 +360,6 @@ impl Web3Connections { } /// get the best available rpc server - #[instrument(skip_all)] pub async fn next_upstream_server( &self, skip: &[Arc], @@ -591,6 +589,7 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever + #[instrument] pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, @@ -660,11 +659,16 @@ impl Serialize for Web3Connections { where S: Serializer, { - let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect(); - let mut state = serializer.serialize_struct("Web3Connections", 6)?; + + let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect(); state.serialize_field("conns", &conns)?; - state.serialize_field("synced_connections", &**self.synced_connections.load())?; + + let synced_connections = &**self.synced_connections.load(); + state.serialize_field("synced_connections", synced_connections)?; + + self.block_hashes.sync(); + self.block_numbers.sync(); state.serialize_field("block_hashes_count", &self.block_hashes.entry_count())?; state.serialize_field("block_hashes_size", &self.block_hashes.weighted_size())?; state.serialize_field("block_numbers_count", &self.block_numbers.entry_count())?; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 3f7f5d7e..c85b5754 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -12,7 +12,7 @@ use std::sync::atomic; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; use tracing::Level; -use tracing::{debug, error, instrument, trace, warn}; +use tracing::{debug, error, trace, warn, Event}; #[derive(Debug)] pub enum OpenRequestResult { @@ -82,7 +82,6 @@ impl OpenRequestHandle { /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented /// TODO: we no longer take self because metered doesn't like that /// TODO: ErrorCount includes too many types of errors, such as transaction reverts - #[instrument(skip_all)] #[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])] pub async fn request( &self, @@ -142,7 +141,27 @@ impl OpenRequestHandle { warn!(?err, %method, rpc=%conn, "bad response!"); } RequestErrorHandler::SaveReverts(chance) => { - // TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? + // TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? we'll need eth_sendRawTransaction somewhere else + + if let Some(metadata) = tracing::Span::current().metadata() { + let fields = metadata.fields(); + + if let Some(user_id) = fields.field("user_id") { + let values = [(&user_id, None)]; + + let valueset = fields.value_set(&values); + + let visitor = todo!(); + + valueset.record(visitor); + + // TODO: now how we do we get the current value out of it? we might need this index + } else { + warn!("no user id"); + } + } + + // TODO: check the span for user_key_id // TODO: only set SaveReverts for // TODO: logging every one is going to flood the database @@ -153,8 +172,9 @@ impl OpenRequestHandle { if let Some(HttpClientError::JsonRpcError(err)) = err.downcast_ref::() { - if err.message == "execution reverted" { + if err.message.starts_with("execution reverted") { debug!(%method, ?params, "TODO: save the request"); + // TODO: don't do this on the hot path. spawn it } else { debug!(?err, %method, rpc=%conn, "bad response!"); } @@ -164,8 +184,9 @@ impl OpenRequestHandle { if let Some(WsClientError::JsonRpcError(err)) = err.downcast_ref::() { - if err.message == "execution reverted" { + if err.message.starts_with("execution reverted") { debug!(%method, ?params, "TODO: save the request"); + // TODO: don't do this on the hot path. spawn it } else { debug!(?err, %method, rpc=%conn, "bad response!"); }