From 37a1aa554b99324f59f1dd329b50ed7d45dba6ed Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 21 Sep 2022 04:48:21 +0000 Subject: [PATCH] revert error saving and extract blockHash from requests --- Cargo.lock | 6 +-- deferred-rate-limiter/Cargo.toml | 2 +- migration/first_draft.sql | 45 ---------------- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/block_number.rs | 19 +++++-- web3_proxy/src/rpcs/blockchain.rs | 4 +- web3_proxy/src/rpcs/connection.rs | 21 +++++--- web3_proxy/src/rpcs/connections.rs | 16 ++++-- web3_proxy/src/rpcs/request.rs | 83 +++++++++++++++++++++++++---- web3_proxy/src/rpcs/transactions.rs | 8 ++- 10 files changed, 128 insertions(+), 78 deletions(-) delete mode 100644 migration/first_draft.sql diff --git a/Cargo.lock b/Cargo.lock index 2a186c7c..c2b64ae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3830,9 +3830,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.11" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" dependencies = [ "base64 0.13.0", "bytes", @@ -3846,9 +3846,9 @@ dependencies = [ "hyper-rustls", "ipnet", "js-sys", - "lazy_static", "log", "mime", + "once_cell", "percent-encoding", "pin-project-lite", "rustls", diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index f61b582a..9acdac5f 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.65" -hashbrown = "*" +hashbrown = "0.12.3" moka = { version = "0.9.4", default-features = false, features = ["future"] } tokio = "1.21.1" tracing = "0.1.36" diff --git a/migration/first_draft.sql b/migration/first_draft.sql deleted file mode 100644 index f8cec822..00000000 --- a/migration/first_draft.sql +++ /dev/null @@ -1,45 +0,0 @@ -CREATE TABLE users ( - id SERIAL PRIMARY KEY, - primary_chain INT, - primary_address VARCHAR(42), - description VARCHAR(255), - email VARCHAR(320), -) - --- TODO: foreign keys --- TODO: how should we store addresses? --- TODO: creation time? --- TODO: permissions. likely similar to infura -CREATE TABLE secondary_users ( - id SERIAL PRIMARY KEY, - users_id BIGINT, - secondary_address VARCHAR(42), - secondary_chain INT, - description VARCHAR, - email VARCHAR(320), -) - --- TODO: creation time? -CREATE TABLE blocklist ( - id SERIAL PRIMARY KEY, - blocked_address VARCHAR, - chain INT, - reason TEXT, -) - --- TODO: foreign keys --- TODO: index on api_key --- TODO: what size for api_key --- TODO: track active with a timestamp? --- TODO: creation time? --- TODO: requests_per_second INT, --- TODO: requests_per_day INT, --- TODO: more security features. likely similar to infura -CREATE TABLE user_keys ( - id SERIAL PRIMARY KEY, - users_id BIGINT, - api_key VARCHAR, - description VARCHAR, - private_txs BOOLEAN, - active BOOLEAN, -) diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 310505c4..4b8fcbdb 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -46,7 +46,7 @@ proctitle = "0.1.1" rand = "0.8.5" # TODO: regex has several "perf" features that we might want to use regex = "1.6.0" -reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } +reqwest = { version = "0.11.12", default-features = false, features = ["json", "tokio-rustls"] } handlebars = "4.3.4" rustc-hash = "1.1.0" siwe = "0.4.2" diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 7b833990..dc734172 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,5 +1,8 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. -use ethers::prelude::{BlockNumber, U64}; +use ethers::{ + prelude::{BlockNumber, U64}, + types::H256, +}; use tracing::warn; pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) { @@ -50,7 +53,18 @@ pub fn clean_block_number( } Some(x) => { // convert the json value to a BlockNumber - let block_num: BlockNumber = serde_json::from_value(x.clone())?; + // TODO: this is wrong, it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` + let block_num = if let Some(obj) = x.as_object_mut() { + if let Some(block_hash) = obj.remove("blockHash") { + let block_hash: H256 = serde_json::from_value(block_hash)?; + + todo!("look up the block_hash from our cache"); + } else { + unimplemented!(); + } + } else { + serde_json::from_value::(x.take())? + }; let (modified, block_num) = block_num_to_u64(block_num, latest_block); @@ -170,7 +184,6 @@ pub fn block_needed( Ok(block) => Some(block), Err(err) => { // TODO: seems unlikely that we will get here - // if this is incorrect, it should retry on an archive server warn!(?err, "could not get block from params"); None } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index a847f2e2..a7a59dfc 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -15,7 +15,7 @@ use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; use tokio::time::Duration; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace, warn, Level}; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -106,7 +106,7 @@ impl Web3Connections { Some(rpc) => { rpc.wait_for_request_handle(Duration::from_secs(30)) .await? - .request("eth_getBlockByHash", get_block_params, false) + .request("eth_getBlockByHash", &get_block_params, Level::ERROR.into()) .await? } None => { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index de3e7228..cc6bbd17 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -21,7 +21,7 @@ use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn, Level}; /// An active connection to a Web3 RPC server like geth or erigon. pub struct Web3Connection { @@ -116,7 +116,7 @@ impl Web3Connection { let found_chain_id: Result = new_connection .wait_for_request_handle(Duration::from_secs(30)) .await? - .request("eth_chainId", Option::None::<()>, false) + .request("eth_chainId", &Option::None::<()>, Level::ERROR.into()) .await; match found_chain_id { @@ -205,11 +205,12 @@ impl Web3Connection { .await? .request( "eth_getCode", - ( + &( "0xdead00000000000000000000000000000000beef", maybe_archive_block, ), - true, + // error here are expected, so keep the level low + tracing::Level::DEBUG.into(), ) .await; @@ -537,7 +538,11 @@ impl Web3Connection { match self.wait_for_request_handle(Duration::from_secs(30)).await { Ok(active_request_handle) => { let block: Result, _> = active_request_handle - .request("eth_getBlockByNumber", ("latest", false), false) + .request( + "eth_getBlockByNumber", + &("latest", false), + tracing::Level::ERROR.into(), + ) .await; match block { @@ -608,7 +613,11 @@ impl Web3Connection { let block: Result, _> = self .wait_for_request_handle(Duration::from_secs(30)) .await? - .request("eth_getBlockByNumber", ("latest", false), false) + .request( + "eth_getBlockByNumber", + &("latest", false), + tracing::Level::ERROR.into(), + ) .await .map(|x| Some(Arc::new(x))); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 3c8ba2f0..75f5a00d 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,7 +1,9 @@ ///! Load balanced communication with a group of web3 providers use super::blockchain::{ArcBlock, BlockHashesCache}; use super::connection::Web3Connection; -use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; +use super::request::{ + OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestErrorHandler, +}; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; @@ -313,8 +315,9 @@ impl Web3Connections { let responses = active_request_handles .into_iter() .map(|active_request_handle| async move { - let result: Result, _> = - active_request_handle.request(method, params, false).await; + let result: Result, _> = active_request_handle + .request(method, ¶ms, tracing::Level::ERROR.into()) + .await; result }) .collect::>() @@ -508,8 +511,13 @@ impl Web3Connections { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); + // TODO: get the log percent from the user data? let response_result = active_request_handle - .request(&request.method, &request.params, false) + .request( + &request.method, + &request.params, + RequestErrorHandler::SaveReverts(100.0), + ) .await; match JsonRpcForwardedResponse::try_from_response_result( diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d447bb27..3f7f5d7e 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,7 +1,7 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; use crate::metered::{JsonRpcErrorCount, ProviderErrorCount}; -use ethers::providers::ProviderError; +use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use metered::metered; use metered::HitCount; use metered::ResponseTime; @@ -11,8 +11,8 @@ use std::fmt; use std::sync::atomic; use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; -use tracing::warn; -use tracing::{instrument, trace}; +use tracing::Level; +use tracing::{debug, error, instrument, trace, warn}; #[derive(Debug)] pub enum OpenRequestResult { @@ -31,6 +31,24 @@ pub struct OpenRequestHandle { metrics: Arc, } +pub enum RequestErrorHandler { + SaveReverts(f32), + DebugLevel, + ErrorLevel, + WarnLevel, +} + +impl From for RequestErrorHandler { + fn from(level: Level) -> Self { + match level { + Level::DEBUG => RequestErrorHandler::DebugLevel, + Level::ERROR => RequestErrorHandler::ErrorLevel, + Level::WARN => RequestErrorHandler::WarnLevel, + _ => unimplemented!(), + } + } +} + #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { pub fn new(conn: Arc) -> Self { @@ -69,9 +87,8 @@ impl OpenRequestHandle { pub async fn request( &self, method: &str, - params: T, - // TODO: change this to error_log_level? - silent_errors: bool, + params: &T, + error_handler: RequestErrorHandler, ) -> Result where T: fmt::Debug + serde::Serialize + Send + Sync, @@ -95,27 +112,71 @@ impl OpenRequestHandle { None => { warn!(rpc=%conn, "no provider!"); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? + // TODO: maybe use a watch handle? // TODO: sleep how long? subscribe to something instead? + // TODO: this is going to be very verbose! sleep(Duration::from_millis(100)).await } Some(found_provider) => provider = Some(found_provider.clone()), } } - let response = match &*provider.expect("provider was checked already") { + let provider = &*provider.expect("provider was checked already"); + + let response = match provider { Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, }; conn.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); - // TODO: i think ethers already has trace logging (and does it much more fancy) if let Err(err) = &response { - if !silent_errors { - // TODO: this isn't always bad. missing trie node while we are checking initial - warn!(?err, %method, rpc=%conn, "bad response!"); + match error_handler { + RequestErrorHandler::ErrorLevel => { + error!(?err, %method, rpc=%conn, "bad response!"); + } + RequestErrorHandler::DebugLevel => { + debug!(?err, %method, rpc=%conn, "bad response!"); + } + RequestErrorHandler::WarnLevel => { + warn!(?err, %method, rpc=%conn, "bad response!"); + } + RequestErrorHandler::SaveReverts(chance) => { + // TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? + + // TODO: only set SaveReverts for + // TODO: logging every one is going to flood the database + // TODO: have a percent chance to do this. or maybe a "logged reverts per second" + if let ProviderError::JsonRpcClientError(err) = err { + match provider { + Web3Provider::Http(_) => { + if let Some(HttpClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + if err.message == "execution reverted" { + debug!(%method, ?params, "TODO: save the request"); + } else { + debug!(?err, %method, rpc=%conn, "bad response!"); + } + } + } + Web3Provider::Ws(_) => { + if let Some(WsClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + if err.message == "execution reverted" { + debug!(%method, ?params, "TODO: save the request"); + } else { + debug!(?err, %method, rpc=%conn, "bad response!"); + } + } + } + } + } + } } } else { + // TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: opt-in response inspection to log reverts with their request. put into redis or what? // trace!(rpc=%self.0, %method, ?response); trace!(%method, rpc=%conn, "response"); diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 2988bc48..32a22108 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -5,7 +5,7 @@ use super::request::OpenRequestResult; use ethers::prelude::{ProviderError, Transaction, TxHash}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::{debug, trace}; +use tracing::{debug, trace, Level}; // TODO: think more about TxState #[derive(Clone)] @@ -27,7 +27,11 @@ impl Web3Connections { let tx: Transaction = match rpc.try_request_handle().await { Ok(OpenRequestResult::Handle(handle)) => { handle - .request("eth_getTransactionByHash", (pending_tx_id,), false) + .request( + "eth_getTransactionByHash", + &(pending_tx_id,), + Level::ERROR.into(), + ) .await? } Ok(_) => {