revert error saving and extract blockHash from requests

This commit is contained in:
Bryan Stitt 2022-09-21 04:48:21 +00:00
parent 6054c3f340
commit 37a1aa554b
10 changed files with 128 additions and 78 deletions

6
Cargo.lock generated

@ -3830,9 +3830,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.11"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
dependencies = [
"base64 0.13.0",
"bytes",
@ -3846,9 +3846,9 @@ dependencies = [
"hyper-rustls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",

@ -8,7 +8,7 @@ edition = "2021"
redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.65"
hashbrown = "*"
hashbrown = "0.12.3"
moka = { version = "0.9.4", default-features = false, features = ["future"] }
tokio = "1.21.1"
tracing = "0.1.36"

@ -1,45 +0,0 @@
CREATE TABLE users (
id SERIAL PRIMARY KEY,
primary_chain INT,
primary_address VARCHAR(42),
description VARCHAR(255),
email VARCHAR(320),
)
-- TODO: foreign keys
-- TODO: how should we store addresses?
-- TODO: creation time?
-- TODO: permissions. likely similar to infura
CREATE TABLE secondary_users (
id SERIAL PRIMARY KEY,
users_id BIGINT,
secondary_address VARCHAR(42),
secondary_chain INT,
description VARCHAR,
email VARCHAR(320),
)
-- TODO: creation time?
CREATE TABLE blocklist (
id SERIAL PRIMARY KEY,
blocked_address VARCHAR,
chain INT,
reason TEXT,
)
-- TODO: foreign keys
-- TODO: index on api_key
-- TODO: what size for api_key
-- TODO: track active with a timestamp?
-- TODO: creation time?
-- TODO: requests_per_second INT,
-- TODO: requests_per_day INT,
-- TODO: more security features. likely similar to infura
CREATE TABLE user_keys (
id SERIAL PRIMARY KEY,
users_id BIGINT,
api_key VARCHAR,
description VARCHAR,
private_txs BOOLEAN,
active BOOLEAN,
)

@ -46,7 +46,7 @@ proctitle = "0.1.1"
rand = "0.8.5"
# TODO: regex has several "perf" features that we might want to use
regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
reqwest = { version = "0.11.12", default-features = false, features = ["json", "tokio-rustls"] }
handlebars = "4.3.4"
rustc-hash = "1.1.0"
siwe = "0.4.2"

@ -1,5 +1,8 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use ethers::prelude::{BlockNumber, U64};
use ethers::{
prelude::{BlockNumber, U64},
types::H256,
};
use tracing::warn;
pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) {
@ -50,7 +53,18 @@ pub fn clean_block_number(
}
Some(x) => {
// convert the json value to a BlockNumber
let block_num: BlockNumber = serde_json::from_value(x.clone())?;
// TODO: this is wrong, it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
let block_num = if let Some(obj) = x.as_object_mut() {
if let Some(block_hash) = obj.remove("blockHash") {
let block_hash: H256 = serde_json::from_value(block_hash)?;
todo!("look up the block_hash from our cache");
} else {
unimplemented!();
}
} else {
serde_json::from_value::<BlockNumber>(x.take())?
};
let (modified, block_num) = block_num_to_u64(block_num, latest_block);
@ -170,7 +184,6 @@ pub fn block_needed(
Ok(block) => Some(block),
Err(err) => {
// TODO: seems unlikely that we will get here
// if this is incorrect, it should retry on an archive server
warn!(?err, "could not get block from params");
None
}

@ -15,7 +15,7 @@ use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tokio::time::Duration;
use tracing::{debug, trace, warn};
use tracing::{debug, trace, warn, Level};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -106,7 +106,7 @@ impl Web3Connections {
Some(rpc) => {
rpc.wait_for_request_handle(Duration::from_secs(30))
.await?
.request("eth_getBlockByHash", get_block_params, false)
.request("eth_getBlockByHash", &get_block_params, Level::ERROR.into())
.await?
}
None => {

@ -21,7 +21,7 @@ use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, instrument, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn, Level};
/// An active connection to a Web3 RPC server like geth or erigon.
pub struct Web3Connection {
@ -116,7 +116,7 @@ impl Web3Connection {
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle(Duration::from_secs(30))
.await?
.request("eth_chainId", Option::None::<()>, false)
.request("eth_chainId", &Option::None::<()>, Level::ERROR.into())
.await;
match found_chain_id {
@ -205,11 +205,12 @@ impl Web3Connection {
.await?
.request(
"eth_getCode",
(
&(
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
),
true,
// error here are expected, so keep the level low
tracing::Level::DEBUG.into(),
)
.await;
@ -537,7 +538,11 @@ impl Web3Connection {
match self.wait_for_request_handle(Duration::from_secs(30)).await {
Ok(active_request_handle) => {
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false), false)
.request(
"eth_getBlockByNumber",
&("latest", false),
tracing::Level::ERROR.into(),
)
.await;
match block {
@ -608,7 +613,11 @@ impl Web3Connection {
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(Duration::from_secs(30))
.await?
.request("eth_getBlockByNumber", ("latest", false), false)
.request(
"eth_getBlockByNumber",
&("latest", false),
tracing::Level::ERROR.into(),
)
.await
.map(|x| Some(Arc::new(x)));

@ -1,7 +1,9 @@
///! Load balanced communication with a group of web3 providers
use super::blockchain::{ArcBlock, BlockHashesCache};
use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use super::request::{
OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestErrorHandler,
};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
@ -313,8 +315,9 @@ impl Web3Connections {
let responses = active_request_handles
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> =
active_request_handle.request(method, params, false).await;
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &params, tracing::Level::ERROR.into())
.await;
result
})
.collect::<FuturesUnordered<_>>()
@ -508,8 +511,13 @@ impl Web3Connections {
// save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection());
// TODO: get the log percent from the user data?
let response_result = active_request_handle
.request(&request.method, &request.params, false)
.request(
&request.method,
&request.params,
RequestErrorHandler::SaveReverts(100.0),
)
.await;
match JsonRpcForwardedResponse::try_from_response_result(

@ -1,7 +1,7 @@
use super::connection::Web3Connection;
use super::provider::Web3Provider;
use crate::metered::{JsonRpcErrorCount, ProviderErrorCount};
use ethers::providers::ProviderError;
use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
@ -11,8 +11,8 @@ use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use tokio::time::{sleep, Duration, Instant};
use tracing::warn;
use tracing::{instrument, trace};
use tracing::Level;
use tracing::{debug, error, instrument, trace, warn};
#[derive(Debug)]
pub enum OpenRequestResult {
@ -31,6 +31,24 @@ pub struct OpenRequestHandle {
metrics: Arc<OpenRequestHandleMetrics>,
}
pub enum RequestErrorHandler {
SaveReverts(f32),
DebugLevel,
ErrorLevel,
WarnLevel,
}
impl From<Level> for RequestErrorHandler {
fn from(level: Level) -> Self {
match level {
Level::DEBUG => RequestErrorHandler::DebugLevel,
Level::ERROR => RequestErrorHandler::ErrorLevel,
Level::WARN => RequestErrorHandler::WarnLevel,
_ => unimplemented!(),
}
}
}
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub fn new(conn: Arc<Web3Connection>) -> Self {
@ -69,9 +87,8 @@ impl OpenRequestHandle {
pub async fn request<T, R>(
&self,
method: &str,
params: T,
// TODO: change this to error_log_level?
silent_errors: bool,
params: &T,
error_handler: RequestErrorHandler,
) -> Result<R, ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
@ -95,27 +112,71 @@ impl OpenRequestHandle {
None => {
warn!(rpc=%conn, "no provider!");
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: maybe use a watch handle?
// TODO: sleep how long? subscribe to something instead?
// TODO: this is going to be very verbose!
sleep(Duration::from_millis(100)).await
}
Some(found_provider) => provider = Some(found_provider.clone()),
}
}
let response = match &*provider.expect("provider was checked already") {
let provider = &*provider.expect("provider was checked already");
let response = match provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};
conn.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
// TODO: i think ethers already has trace logging (and does it much more fancy)
if let Err(err) = &response {
if !silent_errors {
// TODO: this isn't always bad. missing trie node while we are checking initial
warn!(?err, %method, rpc=%conn, "bad response!");
match error_handler {
RequestErrorHandler::ErrorLevel => {
error!(?err, %method, rpc=%conn, "bad response!");
}
RequestErrorHandler::DebugLevel => {
debug!(?err, %method, rpc=%conn, "bad response!");
}
RequestErrorHandler::WarnLevel => {
warn!(?err, %method, rpc=%conn, "bad response!");
}
RequestErrorHandler::SaveReverts(chance) => {
// TODO: only set SaveReverts if this is an eth_call or eth_estimateGas?
// TODO: only set SaveReverts for
// TODO: logging every one is going to flood the database
// TODO: have a percent chance to do this. or maybe a "logged reverts per second"
if let ProviderError::JsonRpcClientError(err) = err {
match provider {
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
if err.message == "execution reverted" {
debug!(%method, ?params, "TODO: save the request");
} else {
debug!(?err, %method, rpc=%conn, "bad response!");
}
}
}
Web3Provider::Ws(_) => {
if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
if err.message == "execution reverted" {
debug!(%method, ?params, "TODO: save the request");
} else {
debug!(?err, %method, rpc=%conn, "bad response!");
}
}
}
}
}
}
}
} else {
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: opt-in response inspection to log reverts with their request. put into redis or what?
// trace!(rpc=%self.0, %method, ?response);
trace!(%method, rpc=%conn, "response");

@ -5,7 +5,7 @@ use super::request::OpenRequestResult;
use ethers::prelude::{ProviderError, Transaction, TxHash};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace};
use tracing::{debug, trace, Level};
// TODO: think more about TxState
#[derive(Clone)]
@ -27,7 +27,11 @@ impl Web3Connections {
let tx: Transaction = match rpc.try_request_handle().await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,), false)
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::ERROR.into(),
)
.await?
}
Ok(_) => {