This commit is contained in:
Bryan Stitt 2022-09-21 23:50:55 +00:00
parent 339bd41f50
commit 8481f6d44c
18 changed files with 269 additions and 95 deletions

31
Cargo.lock generated
View File

@ -1049,6 +1049,17 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb4a24b1aaf0fd0ce8b45161144d6f42cd91677fd5940fd431183eb023b3a2b8"
[[package]]
name = "cookie"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
dependencies = [
"percent-encoding",
"time 0.3.14",
"version_check",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
@ -2782,7 +2793,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-std",
"sea-orm-migration",
@ -5083,6 +5094,23 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-cookies"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19833e336396f3953e5ab1513d72b5e5ea51d5ad39b78d306766a05740b48b97"
dependencies = [
"async-trait",
"axum-core",
"cookie",
"futures-util",
"http",
"parking_lot 0.12.1",
"pin-project-lite",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.3.4"
@ -5575,6 +5603,7 @@ dependencies = [
"tokio-stream",
"toml",
"tower",
"tower-cookies",
"tower-http",
"tower-request-id",
"tracing",

View File

@ -37,6 +37,12 @@ cargo run --release -- --config ./config/example.toml
## Common commands
Create a user:
```
cargo run --bin web3_proxy_cli -- --db-url "$YOUR_DB_URL" create_user --address "$USER_ADDRESS_0x"
```
Check that the proxy is working:
```

23
TODO.md
View File

@ -121,8 +121,7 @@ These are roughly in order of completition
- [x] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check
- i think there is also a bug because i've seen "server not synced" a couple times
- [x] bug around eth_getBlockByHash sometimes causes tokio to lock up
- i keep a mapping of blocks so that i can go from hash -> block. it has some consistent hashing it does to split them up across multiple maps each with their own lock. so a lot of the time reads dont block writes because they are in different internal maps. this was fine.
- but after changing my fork detection logic to use the same rules as erigon, i discovered that when you get blocks from a websocket subscription in erigon and geth, theres a missing field (https://github.com/ledgerwatch/erigon/issues/5190). so i added a query to get the block that includes the missing field.
- i keep a mapping of blocks so that i can go from hash -> block. it has some consistent hashing it does to split them up across multiple maps each with their own lock. so a lot of the time reads dont block writes because they are in different internal maps. this was fine. but after changing my fork detection logic to use the same rules as erigon, i discovered that when you get blocks from a websocket subscription in erigon and geth, theres a missing field (https://github.com/ledgerwatch/erigon/issues/5190). so i added a query to get the block that includes the missing field.
- but i did this in a way where i was holding the write lock open while doing the query. the "new" block that has the missing field ends up in the same bucket and it also wants a write lock. oops. entry api has very sharp edges. don't ever await inside a match on DashMap::entry
- [x] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better.
- [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions.
@ -156,18 +155,24 @@ These are roughly in order of completition
- erigon gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"`
- [x] database migration to change user_keys.requests_per_minute to bigunsigned (max of 18446744073709551615)
- [x] change user creation script to have a "unlimited requests per minute" flag that sets it to u64::MAX (18446744073709551615)
- [ ] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user. let them choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- [x] in /status, block hashes has a lower count than block numbers. how is that possible?
- we weren't calling sync. now we are
- [-] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user.
- [-] let them choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [-] add configurable size limits to all the Caches
- [ ] Api keys need option to lock to IP, cors header, referer, etc
- [-] Api keys need option to lock to IP, cors header, referer, etc
- [ ] active requests per second per api key
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info?
- [-] add configurable size limits to all the Caches
- [ ] Ulid instead of Uuid for user keys
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
- since users are actively using our service, we will need to support both
- [ ] Ulid instead of Uuid for database ids
- might have to use Uuid in sea-orm and then convert to Ulid on display
- [ ] bearer tokens should expire
- [-] signed cookie jar
- [ ] user login should return both the bearer token and a jwt (jsonwebtoken rust crate should make it easy)
- [ ] /user/logout to clear bearer token and jwt
## V1
@ -366,4 +371,8 @@ in another repo: event subscriber
- i wish i had more logs. its possible that 15479605 came immediatly after
- [ ] ip blocking logs a warn. we don't need that. a stat at most
- [ ] keep it working without redis and a database
- [ ] in /status, block hashes has a lower count than block numbers. how is that possible?
- [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info?
- [ ] better handling for offline http servers
- if we get a connection refused, we should remove the server's block info so it is taken out of rotation
- [ ] web3_proxy_cli command should read database settings from config
- [ ] how should we handle reverting transactions? they won't confirm for a while after we send them

View File

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
publish = false
@ -16,6 +16,6 @@ version = "0.9.2"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` featrure
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-mysql", # `DATABASE_DRIVER` feature
]

View File

@ -1,7 +1,7 @@
pub use sea_orm_migration::prelude::*;
pub mod m20220101_000001_create_table;
pub mod m20220921_181610_log_reverts;
mod m20220101_000001_create_table;
mod m20220921_181610_log_reverts;
pub struct Migrator;

View File

@ -174,7 +174,7 @@ enum SecondaryUser {
-- TODO: more security features. likely similar to infura
*/
#[derive(Iden)]
pub enum UserKeys {
enum UserKeys {
Table,
Id,
UserId,

View File

@ -0,0 +1,118 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// add some fields to the UserKeys table
manager
.alter_table(
sea_query::Table::alter()
.table(UserKeys::Table)
.to_owned()
// change requests per minute to a big_unsigned
.modify_column(
ColumnDef::new(UserKeys::RequestsPerMinute)
.big_unsigned()
.not_null(),
)
// add a column for logging reverts in the RevertLogs table
.add_column(
ColumnDef::new(UserKeys::LogReverts)
.boolean()
.not_null()
.default(false),
)
.to_owned(),
)
.await?;
// create a table for logging reverting eth_call and eth_estimateGas
manager
.create_table(
Table::create()
.table(RevertLogs::Table)
.col(
ColumnDef::new(RevertLogs::Id)
.big_unsigned()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(RevertLogs::UserKeyId)
.big_unsigned()
.not_null(),
)
.col(ColumnDef::new(RevertLogs::Timestamp).timestamp().not_null())
.col(
ColumnDef::new(RevertLogs::Method)
.enumeration(
"method",
["eth_call", "eth_estimateGas", "eth_sendRawTransaction"],
)
.not_null(),
)
.col(ColumnDef::new(RevertLogs::To).binary_len(20).not_null())
.col(ColumnDef::new(RevertLogs::CallData).text().not_null())
.index(sea_query::Index::create().col(RevertLogs::To))
.foreign_key(
sea_query::ForeignKey::create()
.from(RevertLogs::Table, RevertLogs::UserKeyId)
.to(UserKeys::Table, UserKeys::Id),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// drop the new table
manager
.drop_table(Table::drop().table(RevertLogs::Table).to_owned())
.await?;
// put the UserKeys back to how it was before our migrations
manager
.alter_table(
sea_query::Table::alter()
.table(UserKeys::Table)
.to_owned()
.modify_column(
ColumnDef::new(UserKeys::RequestsPerMinute)
.unsigned()
.not_null(),
)
.drop_column(UserKeys::LogReverts)
.to_owned(),
)
.await
}
}
// copied from create_table.rs, but added
#[derive(Iden)]
pub enum UserKeys {
Table,
Id,
// UserId,
// ApiKey,
// Description,
// PrivateTxs,
// Active,
RequestsPerMinute,
LogReverts,
}
#[derive(Iden)]
enum RevertLogs {
Table,
Id,
UserKeyId,
Method,
CallData,
To,
Timestamp,
}

View File

@ -59,6 +59,7 @@ time = "0.3.14"
tokio = { version = "1.21.1", features = ["full", "tracing"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.10", features = ["sync"] }
tower-cookies = "0.7"
toml = "0.5.9"
tower = "0.4.13"
tower-request-id = "0.2.0"

View File

@ -119,7 +119,6 @@ pub async fn flatten_handles<T>(
}
/// Connect to the database and run migrations
#[instrument(skip_all)]
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
@ -316,6 +315,8 @@ impl Web3ProxyApp {
Some(private_rpcs)
};
// TODO: setup a channel here for receiving influxdb stats
let mut frontend_ip_rate_limiter = None;
let mut frontend_key_rate_limiter = None;
if let Some(redis_pool) = redis_pool.as_ref() {
@ -347,6 +348,7 @@ impl Web3ProxyApp {
// all the users are the same size, so no need for a weigher
// TODO: max_capacity from config
// TODO: ttl from config
let user_cache = Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(60))
@ -396,7 +398,6 @@ impl Web3ProxyApp {
.expect("prometheus metrics should always serialize")
}
#[instrument(skip_all)]
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
@ -591,7 +592,6 @@ impl Web3ProxyApp {
}
/// send the request or batch of requests to the approriate RPCs
#[instrument(skip_all)]
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
request: JsonRpcRequestEnum,
@ -621,7 +621,6 @@ impl Web3ProxyApp {
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
#[instrument(skip_all)]
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
requests: Vec<JsonRpcRequest>,
@ -645,7 +644,6 @@ impl Web3ProxyApp {
Ok(collected)
}
#[instrument(skip_all)]
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
@ -658,7 +656,6 @@ impl Web3ProxyApp {
}
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
#[instrument(skip_all)]
async fn proxy_web3_rpc_request(
self: &Arc<Self>,
mut request: JsonRpcRequest,
@ -670,14 +667,7 @@ impl Web3ProxyApp {
let request_id = request.id.clone();
// TODO: if eth_chainId or net_version, serve those without querying the backend
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
// TODO: add things to this span
let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
// TODO: don't clone
// TODO: don't clone?
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
// lots of commands are blocked
method @ ("admin_addPeer"
@ -800,10 +790,7 @@ impl Web3ProxyApp {
// emit stats
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
.try_send_all_upstream_servers(request, None)
.instrument(span)
.await;
return rpcs.try_send_all_upstream_servers(request, None).await;
}
"eth_syncing" => {
// no stats on this. its cheap
@ -862,8 +849,13 @@ impl Web3ProxyApp {
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
let request_block_id = if let Some(request_block_needed) =
block_needed(method, request.params.as_mut(), head_block_id.num)
let request_block_id = if let Some(request_block_needed) = block_needed(
method,
request.params.as_mut(),
head_block_id.num,
&self.balanced_rpcs,
)
.await
{
// TODO: maybe this should be on the app and not on balanced_rpcs
let request_block_hash =

View File

@ -1,10 +1,13 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use anyhow::Context;
use ethers::{
prelude::{BlockNumber, U64},
types::H256,
};
use tracing::warn;
use crate::rpcs::connections::Web3Connections;
pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) {
match block_num {
BlockNumber::Earliest => {
@ -29,10 +32,11 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64
}
/// modify params to always have a block number and not "latest"
pub fn clean_block_number(
pub async fn clean_block_number(
params: &mut serde_json::Value,
block_param_id: usize,
latest_block: U64,
rpcs: &Web3Connections,
) -> anyhow::Result<U64> {
match params.as_array_mut() {
None => {
@ -53,20 +57,33 @@ pub fn clean_block_number(
}
Some(x) => {
// convert the json value to a BlockNumber
// TODO: this is wrong, it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
let block_num = if let Some(obj) = x.as_object_mut() {
let (modified, block_num) = if let Some(obj) = x.as_object_mut() {
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
if let Some(block_hash) = obj.remove("blockHash") {
let block_hash: H256 = serde_json::from_value(block_hash)?;
let block_hash: H256 =
serde_json::from_value(block_hash).context("decoding blockHash")?;
todo!("look up the block_hash from our cache");
let block = rpcs.block(&block_hash, None).await?;
let block_num = block
.number
.expect("blocks here should always have numbers");
// always set modfied to true because we used "take" above
(true, block_num)
} else {
unimplemented!();
return Err(anyhow::anyhow!("blockHash missing"));
}
} else {
serde_json::from_value::<BlockNumber>(x.take())?
};
// it might be a string like "latest" or a block number
// TODO: "BlockNumber" needs a better name
let block_number = serde_json::from_value::<BlockNumber>(x.take())?;
let (modified, block_num) = block_num_to_u64(block_num, latest_block);
let (_, block_num) = block_num_to_u64(block_number, latest_block);
// always set modfied to true because we used "take" above
(true, block_num)
};
// if we changed "latest" to a number, update the params to match
if modified {
@ -79,11 +96,12 @@ pub fn clean_block_number(
}
}
// TODO: change this to also return the hash needed
pub fn block_needed(
// TODO: change this to also return the hash needed?
pub async fn block_needed(
method: &str,
params: Option<&mut serde_json::Value>,
head_block_num: U64,
rpcs: &Web3Connections,
) -> Option<U64> {
// if no params, no block is needed
let params = params?;
@ -121,6 +139,8 @@ pub fn block_needed(
*x = serde_json::to_value(block_num).unwrap();
}
// TODO: maybe don't return. instead check toBlock too?
// TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both!
return Some(block_num);
}
@ -136,13 +156,11 @@ pub fn block_needed(
return Some(block_num);
}
if let Some(x) = obj.get("blockHash") {
// TODO: check a Cache of recent hashes
// TODO: error if fromBlock or toBlock were set
todo!("handle blockHash {}", x);
if obj.contains_key("blockHash") {
1
} else {
return None;
}
return None;
}
"eth_getStorageAt" => 2,
"eth_getTransactionByHash" => {
@ -180,7 +198,7 @@ pub fn block_needed(
}
};
match clean_block_number(params, block_param_id, head_block_num) {
match clean_block_number(params, block_param_id, head_block_num, rpcs).await {
Ok(block) => Some(block),
Err(err) => {
// TODO: seems unlikely that we will get here

View File

@ -113,7 +113,6 @@ impl Web3ConnectionConfig {
/// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate))
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn spawn(
self,
name: String,

View File

@ -133,7 +133,6 @@ impl IntoResponse for FrontendErrorResponse {
}
}
#[instrument(skip_all)]
pub async fn handler_404() -> Response {
FrontendErrorResponse::NotFound.into_response()
}

View File

@ -6,7 +6,6 @@ use std::sync::Arc;
use tracing::instrument;
/// Health check page for load balancers to use
#[instrument(skip_all)]
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: also check that the head block is not too old
if app.balanced_rpcs.synced() {
@ -18,14 +17,12 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// Prometheus metrics
/// TODO: when done debugging, remove this and only allow access on a different port
#[instrument(skip_all)]
pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.prometheus_metrics()
}
/// Very basic status page
/// TODO: replace this with proper stats and monitoring
#[instrument(skip_all)]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync();
app.user_cache.sync();

View File

@ -12,26 +12,18 @@ use uuid::Uuid;
pub async fn public_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Host(host): Host,
ClientIp(ip): ClientIp,
Json(payload): Json<JsonRpcRequestEnum>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
) -> FrontendResult {
let request_span = debug_span!("request", host, ?referer, ?user_agent);
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let ip = rate_limit_by_ip(&app, ip)
.instrument(request_span.clone())
.await?;
let user_span = error_span!("ip", %ip);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(payload)
.instrument(request_span)
.instrument(user_span)
.await
});
let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await });
let response = f.await.unwrap()?;
@ -40,26 +32,23 @@ pub async fn public_proxy_web3_rpc(
pub async fn user_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Host(host): Host,
ClientIp(ip): ClientIp,
Json(payload): Json<JsonRpcRequestEnum>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(user_key): Path<Uuid>,
referer: Option<TypedHeader<Referer>>,
) -> FrontendResult {
let request_span = debug_span!("request", host, ?referer, ?user_agent);
let request_span =
error_span!("request", %ip, ?referer, ?user_agent, user_id = tracing::field::Empty);
// TODO: this should probably return the user_key_id instead? or maybe both?
let user_id = rate_limit_by_key(&app, user_key)
.instrument(request_span.clone())
.await?;
let user_span = error_span!("user", user_id);
request_span.record("user_id", user_id);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(payload)
.instrument(request_span)
.instrument(user_span)
.await
});
let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await });
let response = f.await.unwrap()?;

View File

@ -9,7 +9,6 @@ use tracing::{info, instrument};
use crate::app::Web3ProxyApp;
/// Run a prometheus metrics server on the given port.
#[instrument(skip_all)]
pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
// build our application with a route
// order most to least common
@ -42,7 +41,6 @@ pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
.map_err(Into::into)
}
#[instrument(skip_all)]
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics();

View File

@ -174,7 +174,6 @@ impl Web3Connection {
Ok((new_connection, handle))
}
#[instrument(skip_all)]
async fn check_block_data_limit(self: &Arc<Self>) -> anyhow::Result<Option<u64>> {
let mut limit = None;
@ -301,7 +300,6 @@ impl Web3Connection {
}
/// reconnect a websocket provider
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
// websocket doesn't need the http client
@ -363,7 +361,6 @@ impl Web3Connection {
self.provider.read().await.is_some()
}
#[instrument(skip_all)]
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
@ -426,7 +423,6 @@ impl Web3Connection {
}
/// subscribe to blocks and transactions with automatic reconnects
#[instrument(skip_all)]
async fn subscribe(
self: Arc<Self>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
@ -513,7 +509,6 @@ impl Web3Connection {
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
#[instrument(skip_all)]
async fn subscribe_new_heads(
self: Arc<Self>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
@ -663,7 +658,6 @@ impl Web3Connection {
Ok(())
}
#[instrument(skip_all)]
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,

View File

@ -17,7 +17,7 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use moka::future::Cache;
use moka::future::{Cache, ConcurrentCacheExt};
use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
@ -302,7 +302,6 @@ impl Web3Connections {
}
/// Send the same request to all the handles. Returning the most common success or most common error.
#[instrument(skip_all)]
pub async fn try_send_parallel_requests(
&self,
active_request_handles: Vec<OpenRequestHandle>,
@ -361,7 +360,6 @@ impl Web3Connections {
}
/// get the best available rpc server
#[instrument(skip_all)]
pub async fn next_upstream_server(
&self,
skip: &[Arc<Web3Connection>],
@ -591,6 +589,7 @@ impl Web3Connections {
}
/// be sure there is a timeout on this or it might loop forever
#[instrument]
pub async fn try_send_all_upstream_servers(
&self,
request: JsonRpcRequest,
@ -660,11 +659,16 @@ impl Serialize for Web3Connections {
where
S: Serializer,
{
let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect();
let mut state = serializer.serialize_struct("Web3Connections", 6)?;
let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect();
state.serialize_field("conns", &conns)?;
state.serialize_field("synced_connections", &**self.synced_connections.load())?;
let synced_connections = &**self.synced_connections.load();
state.serialize_field("synced_connections", synced_connections)?;
self.block_hashes.sync();
self.block_numbers.sync();
state.serialize_field("block_hashes_count", &self.block_hashes.entry_count())?;
state.serialize_field("block_hashes_size", &self.block_hashes.weighted_size())?;
state.serialize_field("block_numbers_count", &self.block_numbers.entry_count())?;

View File

@ -12,7 +12,7 @@ use std::sync::atomic;
use std::sync::Arc;
use tokio::time::{sleep, Duration, Instant};
use tracing::Level;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{debug, error, trace, warn, Event};
#[derive(Debug)]
pub enum OpenRequestResult {
@ -82,7 +82,6 @@ impl OpenRequestHandle {
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// TODO: we no longer take self because metered doesn't like that
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts
#[instrument(skip_all)]
#[measure([JsonRpcErrorCount, HitCount, ProviderErrorCount, ResponseTime, Throughput])]
pub async fn request<T, R>(
&self,
@ -142,7 +141,27 @@ impl OpenRequestHandle {
warn!(?err, %method, rpc=%conn, "bad response!");
}
RequestErrorHandler::SaveReverts(chance) => {
// TODO: only set SaveReverts if this is an eth_call or eth_estimateGas?
// TODO: only set SaveReverts if this is an eth_call or eth_estimateGas? we'll need eth_sendRawTransaction somewhere else
if let Some(metadata) = tracing::Span::current().metadata() {
let fields = metadata.fields();
if let Some(user_id) = fields.field("user_id") {
let values = [(&user_id, None)];
let valueset = fields.value_set(&values);
let visitor = todo!();
valueset.record(visitor);
// TODO: now how we do we get the current value out of it? we might need this index
} else {
warn!("no user id");
}
}
// TODO: check the span for user_key_id
// TODO: only set SaveReverts for
// TODO: logging every one is going to flood the database
@ -153,8 +172,9 @@ impl OpenRequestHandle {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
if err.message == "execution reverted" {
if err.message.starts_with("execution reverted") {
debug!(%method, ?params, "TODO: save the request");
// TODO: don't do this on the hot path. spawn it
} else {
debug!(?err, %method, rpc=%conn, "bad response!");
}
@ -164,8 +184,9 @@ impl OpenRequestHandle {
if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
if err.message == "execution reverted" {
if err.message.starts_with("execution reverted") {
debug!(%method, ?params, "TODO: save the request");
// TODO: don't do this on the hot path. spawn it
} else {
debug!(?err, %method, rpc=%conn, "bad response!");
}