wow this got big

This commit is contained in:
Bryan Stitt 2023-10-06 14:51:21 -07:00
parent f704d1c4aa
commit 6756559ec2
14 changed files with 608 additions and 547 deletions

@ -4,7 +4,6 @@ use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCa
use crate::config::{AppConfig, TopConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, Web3Request};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum,
@ -1096,37 +1095,14 @@ impl Web3ProxyApp {
async fn try_send_protected(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<Arc<RawValue>> {
let rpcs = if self.protected_rpcs.is_empty() {
let num_public_rpcs = match web3_request.proxy_mode() {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
ProxyMode::Fastest(x) => Some(x * 4),
ProxyMode::Quorum(x, ..) => Some(x),
ProxyMode::Versus => None,
};
self.balanced_rpcs.try_rpcs_for_request(web3_request).await
// // no private rpcs to send to. send to a few public rpcs
// // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
// self.balanced_rpcs
// .try_send_all_synced_connections(
// web3_request,
// Some(Duration::from_secs(10)),
// Some(Level::TRACE.into()),
// num_public_rpcs,
// )
// .await
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
if self.protected_rpcs.is_empty() {
self.balanced_rpcs.request_with_metadata(web3_request).await
} else {
self.protected_rpcs.try_rpcs_for_request(web3_request).await
};
todo!();
self.protected_rpcs
.request_with_metadata(web3_request)
.await
}
}
/// proxy request with up to 3 tries.
@ -1136,22 +1112,35 @@ impl Web3ProxyApp {
authorization: Arc<Authorization>,
head_block: Option<Web3ProxyBlock>,
) -> (StatusCode, jsonrpc::SingleResponse, Vec<Arc<Web3Rpc>>) {
// TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone();
let web3_request =
Web3Request::new_with_app(self, authorization, None, request.into(), head_block).await;
match Web3Request::new_with_app(self, authorization, None, request.into(), head_block)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
return (a, b, vec![]);
}
};
// TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them
// turn some of the Web3ProxyErrors into Ok results
let max_tries = 3;
let mut tries = 0;
loop {
let tries = web3_request.backend_requests.lock().len();
if tries > 0 {
// exponential backoff with jitter
// TODO: wait for RankedRpcs to change instead of this arbitrary sleep
// TODO: refresh the head block and any replacements of "latest" on the web3_request?
sleep(Duration::from_millis(100)).await;
}
tries += 1;
let (code, response) = match self._proxy_request_with_caching(&web3_request).await {
Ok(response_data) => {
web3_request.error_response.store(false, Ordering::Relaxed);
@ -1217,7 +1206,7 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<jsonrpc::SingleResponse> {
// TODO: serve net_version without querying the backend
// TODO: don't force RawValue
let response: jsonrpc::SingleResponse = match web3_request.request.method() {
let response: jsonrpc::SingleResponse = match web3_request.inner.method() {
// lots of commands are blocked
method @ ("db_getHex"
| "db_getString"
@ -1440,7 +1429,7 @@ impl Web3ProxyApp {
&& (error_data.message == "ALREADY_EXISTS: already known"
|| error_data.message == "INTERNAL_ERROR: existing tx with same hash")
{
let params = web3_request.request.params()
let params = web3_request.inner.params()
.as_array()
.ok_or_else(|| {
Web3ProxyError::BadRequest(
@ -1547,7 +1536,7 @@ impl Web3ProxyApp {
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
// TODO: timeout
match &web3_request.request.params() {
match &web3_request.inner.params() {
serde_json::Value::Array(params) => {
// TODO: make a struct and use serde conversion to clean this up
if params.len() != 1
@ -1609,9 +1598,6 @@ impl Web3ProxyApp {
)).into());
}
// TODO: why is this clone needed?
let web3_request = web3_request.clone();
if web3_request.cache_mode.is_some() {
// don't cache anything larger than 16 MiB
let max_response_cache_bytes = 16 * (1024 ^ 2); // self.config.max_response_cache_bytes;
@ -1631,7 +1617,7 @@ impl Web3ProxyApp {
web3_request.ttl(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
&web3_request,
web3_request,
)
).await??
} else {
@ -1652,7 +1638,7 @@ impl Web3ProxyApp {
web3_request.ttl(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
&web3_request,
web3_request,
)
).await??
}
@ -1716,8 +1702,8 @@ impl Web3ProxyApp {
if let Some(x) = x.lock().take() {
Ok(jsonrpc::SingleResponse::Stream(x))
} else {
let method = web3_request.request.method();
let params = web3_request.request.params();
let method = web3_request.inner.method();
let params = web3_request.inner.params();
let err: Web3ProxyError = anyhow::anyhow!("stream was already taken. please report this in Discord. method={:?} params={:?}", method, params).into();
Err(Arc::new(err))
@ -1748,7 +1734,7 @@ impl Web3ProxyApp {
web3_request.ttl(),
self.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(
&web3_request,
web3_request,
)
).await??;

@ -30,7 +30,7 @@ impl Web3ProxyApp {
response_sender: mpsc::Sender<Message>,
) -> Web3ProxyResult<(AbortHandle, jsonrpc::ParsedResponse)> {
let subscribe_to = web3_request
.request
.inner
.params()
.get(0)
.and_then(|x| x.as_str())
@ -89,44 +89,56 @@ impl Web3ProxyApp {
)
.await;
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
{
let _ = response_sender.send(close_message).await;
break;
match subscription_web3_request {
Err(err) => {
error!(?err, "error creating subscription_web3_request");
// TODO: send them an error message before closing
break;
}
Ok(subscription_web3_request) => {
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
{
// TODO: send them a message so they know they were rate limited
let _ = response_sender.send(close_message).await;
break;
}
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": subscription_web3_request.head_block.as_ref().map(|x| &x.0),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
subscription_web3_request.add_response(response_bytes);
}
}
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": subscription_web3_request.head_block.as_ref().map(|x| &x.0),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
subscription_web3_request.add_response(response_bytes);
}
let _ = response_sender.send(Message::Close(None)).await;
trace!("closed newHeads subscription {:?}", subscription_id);
});
}
@ -144,7 +156,7 @@ impl Web3ProxyApp {
while let Some(Ok(new_txid)) = pending_txid_firehose.next().await {
// TODO: include the head_block here?
let subscription_web3_request = Web3Request::new_with_app(
match Web3Request::new_with_app(
&app,
authorization.clone(),
None,
@ -154,46 +166,56 @@ impl Web3ProxyApp {
),
None,
)
.await;
// check if we should close the websocket connection
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
.await
{
let _ = response_sender.send(close_message).await;
break;
Err(err) => {
error!(?err, "error creating subscription_web3_request");
// what should we do to turn this error into a message for them?
break;
}
Ok(subscription_web3_request) => {
// check if we should close the websocket connection
if let Some(close_message) = app
.rate_limit_close_websocket(&subscription_web3_request)
.await
{
let _ = response_sender.send(close_message).await;
break;
}
// TODO: make a struct/helper function for this
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_txid,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_web3_request.add_response(response_bytes);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
}
}
}
// TODO: make a struct/helper function for this
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_txid,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_web3_request.add_response(response_bytes);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
let _ = response_sender.send(Message::Close(None)).await;
trace!(
"closed newPendingTransactions subscription {:?}",
subscription_id

@ -152,12 +152,13 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[from(ignore)]
RefererNotAllowed(headers::Referer),
Reqwest(reqwest::Error),
SemaphoreAcquireError(AcquireError),
SerdeJson(serde_json::Error),
SiweVerification(VerificationError),
/// simple way to return an error message to the user and an anyhow to our logs
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
StatusCode(StatusCode, Cow<'static, str>, Option<anyhow::Error>),
StatusCode(StatusCode, Cow<'static, str>, Option<serde_json::Value>),
StripeWebhookError(stripe::WebhookError),
/// TODO: what should be attached to the timout?
#[display(fmt = "{:?}", _0)]
@ -239,8 +240,16 @@ impl Web3ProxyError {
)
}
Self::Arc(err) => {
// recurse
return err.as_response_parts();
// recurse somehow. Web3ProxyError isn't clone and we can't moe out of it
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose all of our anyhow strings?
message: "INTERNAL SERVER ERROR".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: Some(serde_json::Value::String(err.to_string())),
},
)
}
Self::BadRequest(err) => {
trace!(?err, "BAD_REQUEST");
@ -320,11 +329,12 @@ impl Web3ProxyError {
},
)
}
Self::EthersHttpClient(err) => {
if let Ok(err) = JsonRpcErrorData::try_from(err) {
Self::EthersHttpClient(err) => match JsonRpcErrorData::try_from(err) {
Ok(err) => {
trace!(?err, "EthersHttpClient jsonrpc error");
(StatusCode::OK, err)
} else {
}
Err(err) => {
warn!(?err, "EthersHttpClient");
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -335,12 +345,13 @@ impl Web3ProxyError {
},
)
}
}
Self::EthersProvider(err) => {
if let Ok(err) = JsonRpcErrorData::try_from(err) {
},
Self::EthersProvider(err) => match JsonRpcErrorData::try_from(err) {
Ok(err) => {
trace!(?err, "EthersProvider jsonrpc error");
(StatusCode::OK, err)
} else {
}
Err(err) => {
warn!(?err, "EthersProvider");
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -351,12 +362,13 @@ impl Web3ProxyError {
},
)
}
}
Self::EthersWsClient(err) => {
if let Ok(err) = JsonRpcErrorData::try_from(err) {
},
Self::EthersWsClient(err) => match JsonRpcErrorData::try_from(err) {
Ok(err) => {
trace!(?err, "EthersWsClient jsonrpc error");
(StatusCode::OK, err)
} else {
}
Err(err) => {
warn!(?err, "EthersWsClient");
(
StatusCode::INTERNAL_SERVER_ERROR,
@ -367,7 +379,7 @@ impl Web3ProxyError {
},
)
}
}
},
// Self::JsonRpcForwardedError(x) => (StatusCode::OK, x),
Self::GasEstimateNotU256 => {
trace!("GasEstimateNotU256");
@ -606,7 +618,7 @@ impl Web3ProxyError {
Self::JsonRejection(err) => {
trace!(?err, "JsonRejection");
let (message, code): (&str, _) = match err {
let (message, code): (&str, _) = match &err {
JsonRejection::JsonDataError(_) => ("Invalid Request", -32600),
JsonRejection::JsonSyntaxError(_) => ("Parse error", -32700),
JsonRejection::MissingJsonContentType(_) => ("Invalid Request", -32600),
@ -933,6 +945,17 @@ impl Web3ProxyError {
},
)
}
Self::Reqwest(err) => {
warn!(?err, "reqwest");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "reqwest error!".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: Some(serde_json::Value::String(err.to_string())),
},
)
}
Self::SemaphoreAcquireError(err) => {
error!(?err, "semaphore acquire");
(
@ -967,21 +990,22 @@ impl Web3ProxyError {
},
)
}
Self::StatusCode(status_code, err_msg, err) => {
Self::StatusCode(status_code, err_msg, data) => {
// different status codes should get different error levels. 500s should warn. 400s should stat
let code = status_code.as_u16();
if (500..600).contains(&code) {
warn!(%err_msg, ?err, "server error {}", code);
warn!(?data, "server error {}: {}", code, err_msg);
} else {
trace!(%err_msg, ?err, "user error {}", code);
trace!(?data, "user error {}: {}", code, err_msg);
}
// TODO: would be great to do this without the cloning! Something blocked that and I didn't write a comment about it though
(
*status_code,
JsonRpcErrorData {
message: err_msg.clone(),
code: code.into(),
data: None,
data: data.clone(),
},
)
}

@ -33,12 +33,12 @@ use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::{self, Debug, Display};
use std::hash::{Hash, Hasher};
use std::mem;
use std::num::NonZeroU64;
@ -158,7 +158,7 @@ pub struct Web3Request {
/// TODO: this should be in a global config. not copied to every single request
pub usd_per_cu: Decimal,
pub request: RequestOrMethod,
pub inner: RequestOrMethod,
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
@ -195,6 +195,56 @@ pub struct Web3Request {
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
}
impl Display for Web3Request {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}({})",
self.inner.method(),
serde_json::to_string(self.inner.params()).expect("this should always serialize")
)
}
}
impl Serialize for Web3Request {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("request", 7)?;
state.serialize_field(
"archive_request",
&self.archive_request.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field("chain_id", &self.chain_id)?;
state.serialize_field("head_block", &self.head_block)?;
state.serialize_field("request", &self.inner)?;
state.serialize_field("elapsed", &self.start_instant.elapsed().as_secs_f32())?;
{
let backend_names = self.backend_requests.lock();
let backend_names = backend_names
.iter()
.map(|x| x.name.as_str())
.collect::<Vec<_>>();
state.serialize_field("backend_requests", &backend_names)?;
}
state.serialize_field(
"response_bytes",
&self.response_bytes.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
}
impl Default for Authorization {
fn default() -> Self {
Authorization::internal().unwrap()
@ -278,6 +328,7 @@ impl ResponseOrBytes<'_> {
impl Web3Request {
#[allow(clippy::too_many_arguments)]
async fn new_with_options(
app: Option<&Web3ProxyApp>,
authorization: Arc<Authorization>,
chain_id: u64,
head_block: Option<Web3ProxyBlock>,
@ -286,8 +337,7 @@ impl Web3Request {
mut request: RequestOrMethod,
stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
usd_per_cu: Decimal,
app: Option<&Web3ProxyApp>,
) -> Arc<Self> {
) -> Web3ProxyResult<Arc<Self>> {
let start_instant = Instant::now();
// TODO: get this default from config, or from user settings
@ -306,6 +356,8 @@ impl Web3Request {
}
// now that kafka has logged the user's original params, we can calculate the cache key
// TODO: modify CacheMode::new to wait for a future block if one is requested! be sure to update head_block too!
let cache_mode = match &mut request {
RequestOrMethod::Request(x) => CacheMode::new(x, head_block.as_ref(), app).await,
_ => CacheMode::Never,
@ -322,7 +374,7 @@ impl Web3Request {
head_block: head_block.clone(),
kafka_debug_logger,
no_servers: 0.into(),
request,
inner: request,
response_bytes: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
@ -333,7 +385,7 @@ impl Web3Request {
user_error_response: false.into(),
};
Arc::new(x)
Ok(Arc::new(x))
}
pub async fn new_with_app(
@ -342,7 +394,7 @@ impl Web3Request {
max_wait: Option<Duration>,
request: RequestOrMethod,
head_block: Option<Web3ProxyBlock>,
) -> Arc<Self> {
) -> Web3ProxyResult<Arc<Self>> {
// TODO: get this out of tracing instead (where we have a String from Amazon's LB)
let request_ulid = Ulid::new();
@ -365,6 +417,7 @@ impl Web3Request {
let usd_per_cu = app.config.usd_per_cu.unwrap_or_default();
Self::new_with_options(
Some(app),
authorization,
chain_id,
head_block,
@ -373,7 +426,6 @@ impl Web3Request {
request,
stat_sender,
usd_per_cu,
Some(app),
)
.await
}
@ -383,7 +435,7 @@ impl Web3Request {
params: &P,
head_block: Option<Web3ProxyBlock>,
max_wait: Option<Duration>,
) -> Arc<Self> {
) -> Web3ProxyResult<Arc<Self>> {
let authorization = Arc::new(Authorization::internal().unwrap());
// TODO: we need a real id! increment a counter on the app
@ -396,6 +448,7 @@ impl Web3Request {
Self::new_with_app(app, authorization, max_wait, request.into(), head_block).await
} else {
Self::new_with_options(
None,
authorization,
0,
head_block,
@ -404,7 +457,6 @@ impl Web3Request {
request.into(),
None,
Default::default(),
None,
)
.await
}
@ -419,7 +471,7 @@ impl Web3Request {
match &self.cache_mode {
CacheMode::Never => None,
x => {
let x = JsonRpcQueryCacheKey::new(x, &self.request).hash();
let x = JsonRpcQueryCacheKey::new(x, &self.inner).hash();
Some(x)
}
@ -433,7 +485,7 @@ impl Web3Request {
#[inline]
pub fn id(&self) -> Box<RawValue> {
self.request.id()
self.inner.id()
}
pub fn max_block_needed(&self) -> Option<U64> {

@ -329,7 +329,7 @@ async fn websocket_proxy_web3_rpc(
"eth_subscribe" => {
let web3_request =
Web3Request::new_with_app(app, authorization, None, json_request.into(), None)
.await;
.await?;
// TODO: how can we subscribe with proxy_mode?
match app
@ -356,15 +356,15 @@ async fn websocket_proxy_web3_rpc(
"eth_unsubscribe" => {
let web3_request =
Web3Request::new_with_app(app, authorization, None, json_request.into(), None)
.await;
.await?;
// sometimes we get a list, sometimes we get the id directly
// check for the list first, then just use the whole thing
let maybe_id = web3_request
.request
.inner
.params()
.get(0)
.unwrap_or_else(|| web3_request.request.params())
.unwrap_or_else(|| web3_request.inner.params())
.clone();
let subscription_id: U64 = match serde_json::from_value::<U64>(maybe_id) {

@ -1,3 +1,5 @@
// TODO: think a lot more about this
use crate::{app::Web3ProxyApp, errors::Web3ProxyError, relational_db::DatabaseReplica};
use derivative::Derivative;
use migration::{

@ -259,11 +259,12 @@ where
{
// TODO: threshold from configs
// TODO: error handling
// TODO: if a large stream's response's initial chunk "error" then we should buffer it
pub async fn read_if_short(
mut response: reqwest::Response,
nbytes: u64,
web3_request: Arc<Web3Request>,
) -> Result<SingleResponse<T>, ProviderError> {
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<SingleResponse<T>> {
Ok(Self::from_bytes(response.bytes().await?)?)
/*
match response.content_length() {
@ -510,29 +511,30 @@ impl JsonRpcRequestEnum {
// TODO: create a stat so we can penalize
// TODO: what request size
let metadata = Web3Request::new_with_app(
let request = Web3Request::new_with_app(
app,
authorization.clone(),
None,
RequestOrMethod::Method("invalid_method".into(), size),
None,
)
.await;
.await
.unwrap();
metadata
request
.user_error_response
.store(true, atomic::Ordering::Release);
let response = Web3ProxyError::BadRequest("request failed validation".into());
metadata.add_response(&response);
request.add_response(&response);
let response = response.into_response_with_id(Some(err_id));
// TODO: variable duration depending on the IP
sleep(duration).await;
let _ = metadata.try_send_arc_stat();
let _ = request.try_send_arc_stat();
Err(response)
}

@ -86,6 +86,7 @@ impl<'a> JsonRpcQueryCacheKey<'a> {
}
}
// TODO: i think if we change this to Arc<JsonRpcResponseEnum<Box<RawValue>>>, we can speed things up
pub type JsonRpcResponseCache = Cache<u64, JsonRpcResponseEnum<Arc<RawValue>>>;
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
@ -201,15 +202,10 @@ impl TryFrom<Web3ProxyError> for JsonRpcResponseEnum<Arc<RawValue>> {
fn try_from(value: Web3ProxyError) -> Result<Self, Self::Error> {
match value {
Web3ProxyError::EthersProvider(err) => {
if let Ok(x) = JsonRpcErrorData::try_from(&err) {
let x = x.into();
Ok(x)
} else {
Err(err.into())
}
}
Web3ProxyError::EthersProvider(err) => match JsonRpcErrorData::try_from(&err) {
Ok(x) => Ok(x.into()),
Err(..) => Err(err.into()),
},
Web3ProxyError::NullJsonRpcResult => Ok(JsonRpcResponseEnum::NullResult),
Web3ProxyError::JsonRpcResponse(x) => Ok(x),
Web3ProxyError::JsonRpcErrorData(err) => Ok(err.into()),
@ -277,10 +273,16 @@ impl<'a> TryFrom<&'a ProviderError> for JsonRpcErrorData {
fn try_from(e: &'a ProviderError) -> Result<Self, Self::Error> {
match e {
ProviderError::JsonRpcClientError(err) => {
if let Some(err) = err.as_error_response() {
Ok(err.into())
} else {
Err(e)
match err.as_error_response() {
Some(err) => {
// this isn't safe to do because we don't have the id of the request
Ok(JsonRpcErrorData {
code: err.code,
message: err.message.clone().into(),
data: err.data.clone(),
})
}
None => Err(e),
}
}
e => Err(e),

@ -121,6 +121,7 @@ pub struct RankedRpcs {
sort_mode: SortMethod,
}
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
outer: Vec<Arc<Web3Rpc>>,

@ -12,21 +12,24 @@ use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData, ParsedResponse};
use derive_more::From;
use ethers::prelude::{TxHash, U64};
use ethers::providers::ProviderError;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use http::StatusCode;
use itertools::Itertools;
use moka::future::CacheBuilder;
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
use std::borrow::Cow;
use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tokio::time::{Duration, Instant};
use tokio::time::{sleep_until, Duration, Instant};
use tokio::{pin, select};
use tracing::{debug, error, info, instrument, trace, warn};
@ -393,11 +396,54 @@ impl Web3Rpcs {
Ok(())
}
/// TODO: i think this RpcsForRequest should be stored on the Web3Request when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app
pub async fn wait_for_rpcs_for_request(
&self,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<RpcsForRequest> {
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
loop {
// other places check web3_request ttl. i don't think we need a check here too
let next_try = match self.try_rpcs_for_request(web3_request).await {
Ok(x) => return Ok(x),
Err(Web3ProxyError::RateLimited(_, Some(retry_at))) => retry_at,
Err(x) => return Err(x),
};
if next_try > web3_request.expire_instant {
let next_try = Instant::now().duration_since(next_try).as_secs();
// we don't use Web3ProxyError::RateLimited because that is for the user being rate limited
return Err(Web3ProxyError::StatusCode(
StatusCode::TOO_MANY_REQUESTS,
"backend rpcs are all rate limited!".into(),
Some(json!({"retry_at": next_try})),
));
}
select! {
_ = sleep_until(next_try) => {
// rpcs didn't change and we have waited too long. break to return an error
warn!(?self, "timeout during wait_for_rpcs_for_request!");
},
_ = watch_consensus_rpcs.changed() => {
// consensus rpcs changed!
// TODO: i don't love that we throw it away
watch_consensus_rpcs.borrow_and_update();
}
}
}
}
/// get all rpc servers that are not rate limited
/// this prefers synced servers, but it will return servers even if they aren't fully in sync.
/// this does not gaurentee you won't be rate limited. we don't increment our counters until you try to send. so you might have to wait to be able to send
/// TODO: should this wait for ranked rpcs? maybe only a fraction of web3_request's time?
pub async fn try_rpcs_for_request(&self, web3_request: &Arc<Web3Request>) -> TryRpcsForRequest {
pub async fn try_rpcs_for_request(
&self,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<RpcsForRequest> {
// TODO: by_name might include things that are on a forked
let ranked_rpcs: Arc<RankedRpcs> =
if let Some(ranked_rpcs) = self.watch_ranked_rpcs.borrow().clone() {
@ -411,14 +457,17 @@ impl Web3Rpcs {
} else {
// i doubt we will ever get here
// TODO: return a future that resolves once we do have something?
return TryRpcsForRequest::None;
return Err(Web3ProxyError::NoServersSynced);
}
} else {
// TODO: return a future that resolves once we do have something?
return TryRpcsForRequest::None;
return Err(Web3ProxyError::NoServersSynced);
};
ranked_rpcs.for_request(web3_request).into()
match ranked_rpcs.for_request(web3_request) {
None => return Err(Web3ProxyError::NoServersSynced),
Some(x) => Ok(x),
}
}
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
@ -430,10 +479,11 @@ impl Web3Rpcs {
let head_block = self.head_block();
let web3_request =
Web3Request::new_internal(method.into(), params, head_block, max_wait).await;
Web3Request::new_internal(method.into(), params, head_block, max_wait).await?;
let response = self.request_with_metadata(&web3_request).await?;
// the response might support streaming. we need to parse it
let parsed = response.parsed().await?;
match parsed.payload {
@ -445,198 +495,46 @@ impl Web3Rpcs {
/// Make a request with stat tracking.
/// The first jsonrpc response will be returned.
/// TODO: move this to RankedRpcsForRequest along with a bunch of other similar functions
/// TODO: take an arg for max_tries. take an arg for quorum(size) or serial
/// TODO? move this to RankedRpcsForRequest along with a bunch of other similar functions? but it needs watch_ranked_rpcs and other things on Web3Rpcs...
/// TODO: have a similar function for quorum(size, max_tries)
/// TODO: should max_tries be on web3_request. maybe as tries_left?
pub async fn request_with_metadata<R: JsonRpcResultData>(
&self,
web3_request: &Arc<Web3Request>,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
let mut method_not_available_response = None;
let watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
// set error_handler to Save. this might be overridden depending on the web3_request.authorization
// TODO: rename this to make it clear it might be overriden
let error_handler = Some(RequestErrorHandler::Save);
// TODO: collect the most common error
let mut last_jsonrpc_error = None;
let mut last_provider_error = None;
// TODO: collect the most common error. Web3ProxyError isn't Hash + Eq though. And making it so would be a pain
let mut errors = vec![];
// TODO: limit number of tries
match self.try_rpcs_for_request(web3_request).await {
TryRpcsForRequest::None => return Err(Web3ProxyError::NoServersSynced),
TryRpcsForRequest::RetryAt(retry_at) => {
if retry_at > web3_request.expire_instant {
return Err(Web3ProxyError::RateLimited(
Default::default(),
Some(retry_at),
));
let rpcs = self.try_rpcs_for_request(web3_request).await?;
let stream = rpcs.to_stream();
pin!(stream);
while let Some(active_request_handle) = stream.next().await {
// TODO: i'd like to get rid of this clone
let rpc = active_request_handle.clone_connection();
let is_backup_response = rpc.backup;
// TODO: i'd like to get rid of this clone
web3_request.backend_requests.lock().push(rpc.clone());
match active_request_handle.request::<R>().await {
Ok(response) => {
return Ok(response);
}
}
TryRpcsForRequest::Some(rpcs) => {
let stream = rpcs.to_stream();
Err(error) => {
// TODO: if this is an error, do NOT return. continue to try on another server
pin!(stream);
while let Some(active_request_handle) = stream.next().await {
let rpc = active_request_handle.clone_connection();
web3_request.backend_requests.lock().push(rpc.clone());
let is_backup_response = rpc.backup;
match active_request_handle.request::<R>().await {
Ok(response) => {
// TODO: if there are multiple responses being aggregated, this will only use the last server's backup type
web3_request
.response_from_backup_rpc
.store(is_backup_response, Ordering::Relaxed);
web3_request
.user_error_response
.store(false, Ordering::Relaxed);
web3_request.error_response.store(false, Ordering::Relaxed);
return Ok(response);
}
Err(error) => {
// TODO: if this is an error, do NOT return. continue to try on another server
let error = match JsonRpcErrorData::try_from(&error) {
Ok(x) => {
web3_request
.user_error_response
.store(true, Ordering::Relaxed);
x
}
Err(err) => {
warn!(?err, "error from {}", rpc);
web3_request.error_response.store(true, Ordering::Relaxed);
web3_request
.user_error_response
.store(false, Ordering::Relaxed);
last_provider_error = Some(error);
continue;
}
};
// some errors should be retried on other nodes
let error_msg = error.message.as_ref();
// different providers do different codes. check all of them
// TODO: there's probably more strings to add here
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
if error_msg.contains("block size") {
// TODO: this message is likely wrong, but i can't find the actual one in my terminal now
// they hit an expected limit. return the error now
return Err(error.into());
} else if error_msg.contains("result on length") {
// this error contains "limit" but is not a rate limit error
// TODO: make the expected limit configurable
// TODO: parse the rate_limit_substr and only continue if it is < expected limit
if error_msg.contains("exceeding limit 16000000")
|| error_msg.ends_with(
"exceeding --rpc.returndata.limit 16000000",
)
{
// they hit our expected limit. return the error now
return Err(error.into());
} else {
// they hit a limit lower than what we expect. the server is misconfigured
error!(
%error_msg,
"unexpected result limit by {}",
"???"
);
continue;
}
} else {
warn!(
%error_msg,
"rate limited by {}",
"???"
);
continue;
}
}
}
match error.code {
-32000 => {
// TODO: regex?
let retry_prefixes = [
"header not found",
"header for hash not found",
"missing trie node",
"node not started",
"RPC timeout",
];
for retry_prefix in retry_prefixes {
if error_msg.starts_with(retry_prefix) {
// TODO: too verbose
debug!("retrying on another server");
continue;
}
}
}
-32601 => {
let error_msg = error.message.as_ref();
// sometimes a provider does not support all rpc methods
// we check other connections rather than returning the error
// but sometimes the method is something that is actually unsupported,
// so we save the response here to return it later
// some providers look like this
if error_msg.starts_with("the method")
&& error_msg.ends_with("is not available")
{
method_not_available_response = Some(error);
continue;
}
// others look like this (this is the example in the official spec)
if error_msg == "Method not found" {
method_not_available_response = Some(error);
continue;
}
}
_ => {}
}
last_jsonrpc_error = Some(error);
}
}
// TODO: track the most common errors
errors.push(error);
}
}
}
if let Some(err) = last_jsonrpc_error {
// TODO: set user_error here instead of above
return Err(err.into());
}
if let Some(err) = last_provider_error {
// TODO: set server_error instead of above
return Err(Web3ProxyError::from(err));
}
if let Some(err) = method_not_available_response {
web3_request.error_response.store(false, Ordering::Relaxed);
web3_request
.user_error_response
.store(true, Ordering::Relaxed);
// this error response is likely the user's fault
// TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature
if let Some(err) = errors.into_iter().next() {
return Err(err.into());
}
@ -695,7 +593,9 @@ impl Web3Rpcs {
Err(JsonRpcErrorData {
message: "Requested data is not available".into(),
code: -32043,
data: None,
data: Some(json!({
"request": web3_request
})),
}
.into())
}
@ -1146,7 +1046,8 @@ mod tests {
Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
let x = rpcs
.wait_for_best_rpc(&r, &mut vec![], Some(RequestErrorHandler::DebugLevel))
.await
@ -1241,7 +1142,8 @@ mod tests {
Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
assert!(matches!(
rpcs.wait_for_best_rpc(&r, &mut vec![], None).await,
Ok(OpenRequestResult::Handle(_))
@ -1255,7 +1157,8 @@ mod tests {
Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
assert!(matches!(
rpcs.wait_for_best_rpc(&r, &mut vec![], None,).await,
Ok(OpenRequestResult::Handle(_))
@ -1271,7 +1174,7 @@ mod tests {
Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
Some(Duration::from_millis(100)),
)
.await;
.await.unwrap();
let future_rpc = rpcs.wait_for_best_rpc(&r, &mut vec![], None).await;
info!(?future_rpc);
@ -1353,7 +1256,8 @@ mod tests {
Some(head_block.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
let best_available_server = rpcs.wait_for_best_rpc(&r, &mut vec![], None).await.unwrap();
debug!("best_available_server: {:#?}", best_available_server);
@ -1463,7 +1367,8 @@ mod tests {
Some(head_block.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
let best_available_server = rpcs.wait_for_best_rpc(&r, &mut vec![], None).await.unwrap();
debug!("best_available_server: {:#?}", best_available_server);
@ -1479,7 +1384,8 @@ mod tests {
Some(head_block.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
let _best_available_server_from_none =
rpcs.wait_for_best_rpc(&r, &mut vec![], None).await.unwrap();
@ -1492,7 +1398,8 @@ mod tests {
Some(head_block.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
let best_archive_server = rpcs.wait_for_best_rpc(&r, &mut vec![], None).await;
match best_archive_server {
@ -1661,7 +1568,8 @@ mod tests {
Some(block_2.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
match &r.cache_mode {
CacheMode::Standard {
@ -1695,7 +1603,8 @@ mod tests {
Some(block_2.clone()),
Some(Duration::from_millis(100)),
)
.await;
.await
.unwrap();
match &r.cache_mode {
CacheMode::Standard {

@ -1095,7 +1095,8 @@ impl Web3Rpc {
error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await;
// TODO: think about this more. its hard to do this without being self-referenctial!
let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?;
self.authorized_request(&web3_request, error_handler, max_wait)
.await

@ -1,14 +1,16 @@
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType, Web3Request};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{self, JsonRpcResultData};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, Payload};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
use ethers::types::{Address, Bytes};
use http::StatusCode;
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use nanorand::Rng;
use serde_json::json;
@ -65,7 +67,7 @@ struct EthCallFirstParams {
impl std::fmt::Debug for OpenRequestHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenRequestHandle")
.field("method", &self.web3_request.request.method())
.field("method", &self.web3_request.inner.method())
.field("rpc", &self.rpc.name)
.finish_non_exhaustive()
}
@ -169,22 +171,129 @@ impl OpenRequestHandle {
self.rpc.clone()
}
pub fn rate_limit_for(&self, x: Duration) {
// TODO: we actually only want to send if our value is greater
if self.rpc.backup {
debug!(?x, "rate limited on {}!", self.rpc);
} else {
warn!(?x, "rate limited on {}!", self.rpc);
}
self.rpc
.hard_limit_until
.as_ref()
.unwrap()
.send_replace(Instant::now() + x);
}
/// Just get the response from the provider without any extra handling.
/// This lets us use the try operator which makes it much easier to read
async fn _request<R: JsonRpcResultData + serde::Serialize>(
&self,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: replace ethers-rs providers with our own that supports streaming the responses
// TODO: replace ethers-rs providers with our own that handles "id" being null
if let (Some(url), Some(ref client)) = (self.rpc.http_url.clone(), &self.rpc.http_client) {
// prefer the http provider
let request = self
.web3_request
.inner
.jsonrpc_request()
.context("there should always be a request here")?;
let response = client.post(url).json(request).send().await?;
if response.status() == StatusCode::TOO_MANY_REQUESTS {
// TODO: how much should we actually rate limit?
self.rate_limit_for(Duration::from_secs(1));
}
let response = response.error_for_status()?;
jsonrpc::SingleResponse::read_if_short(response, 1024, &self.web3_request).await
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
// use the websocket provider if no http provider is available
let method = self.web3_request.inner.method();
let params = self.web3_request.inner.params();
// some ethers::ProviderError need to be converted to JsonRpcErrorData. the rest to Web3ProxyError
let response = match p.request::<_, R>(method, params).await {
Ok(x) => jsonrpc::ParsedResponse::from_result(x, self.web3_request.id()),
Err(provider_error) => match JsonRpcErrorData::try_from(&provider_error) {
Ok(x) => jsonrpc::ParsedResponse::from_error(x, self.web3_request.id()),
Err(ProviderError::HTTPError(error)) => {
if let Some(status_code) = error.status() {
if status_code == StatusCode::TOO_MANY_REQUESTS {
// TODO: how much should we actually rate limit?
self.rate_limit_for(Duration::from_secs(1));
}
}
return Err(provider_error.into());
}
Err(err) => {
warn!(?err, "error from {}", self.rpc);
return Err(provider_error.into());
}
},
};
Ok(response.into())
} else {
// this must be a test
Err(anyhow::anyhow!("no provider configured!").into())
}
}
pub fn error_handler(&self) -> RequestErrorHandler {
if let RequestErrorHandler::Save = self.error_handler {
let method = self.web3_request.inner.method();
// TODO: should all these be Trace or Debug or a mix?
// TODO: this list should come from config. other methods might be desired
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
} else if DB_CONN.read().is_ok() {
let log_revert_chance = self.web3_request.authorization.checks.log_revert_chance;
if log_revert_chance == 0 {
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else if log_revert_chance == u16::MAX {
// trace!(%method, "gaurenteed chance. SAVING on revert");
self.error_handler
} else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance {
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
self.error_handler
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
}
} else {
self.error_handler
}
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// depending on how things are locked, you might need to pass the provider in
/// we take self to ensure this function only runs once
/// TODO: abandon ProviderError
/// This does some inspection of the response to check for non-standard errors and rate limiting to try to give a Web3ProxyError instead of an Ok
pub async fn request<R: JsonRpcResultData + serde::Serialize>(
self,
) -> Result<jsonrpc::SingleResponse<R>, ProviderError> {
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
let method = self.web3_request.request.method();
let params = self.web3_request.request.params();
let authorization = &self.web3_request.authorization;
match &authorization.authorization_type {
@ -202,50 +311,10 @@ impl OpenRequestHandle {
// we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions)
// we generally don't want to use the try operator. we might need to log errors
let start = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
// TODO: replace ethers-rs providers with our own that handles "id" being null
let response: Result<jsonrpc::SingleResponse<R>, _> = if let (
Some(ref url),
Some(ref client),
) =
(&self.rpc.http_url, &self.rpc.http_client)
{
let params: serde_json::Value = serde_json::to_value(params)?;
// TODO: why recreate this? we should be able to just use the one from the user
let request = jsonrpc::JsonRpcRequest::new(
self.web3_request.id().into(),
method.to_string(),
params,
)
.expect("request creation cannot fail");
match client.post(url.clone()).json(&request).send().await {
// TODO: threshold from configs
Ok(response) => {
jsonrpc::SingleResponse::read_if_short(
response,
1024,
self.web3_request.clone(),
)
.await
}
Err(err) => Err(err.into()),
}
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
p.request(method, params)
.await
// TODO: Id here
.map(|result| {
jsonrpc::ParsedResponse::from_result(result, Default::default()).into()
})
} else {
return Err(ProviderError::CustomError(
"no provider configured!".to_string(),
));
};
let mut response = self._request().await;
// measure successes and errors
// originally i thought we wouldn't want errors, but I think it's a more accurate number including all requests
@ -254,122 +323,111 @@ impl OpenRequestHandle {
// we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called!
trace!(
"response from {} for {} {:?}: {:?}",
"response from {} for {}: {:?}",
self.rpc,
method,
params,
self.web3_request,
response,
);
if let Err(err) = &response {
// TODO: move this to another helper
let response_is_success = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload {
Payload::Success { .. } => true,
_ => true,
},
Ok(jsonrpc::SingleResponse::Stream(..)) => true,
Err(_) => false,
};
if response_is_success {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
let error_handler = if let RequestErrorHandler::Save = self.error_handler {
// TODO: should all these be Trace or Debug or a mix?
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
} else if DB_CONN.read().is_ok() {
let log_revert_chance =
self.web3_request.authorization.checks.log_revert_chance;
if log_revert_chance == 0 {
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else if log_revert_chance == u16::MAX {
// trace!(%method, "gaurenteed chance. SAVING on revert");
self.error_handler
} else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance
{
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
self.error_handler
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
}
} else {
self.error_handler
};
// TODO: simple enum -> string derive?
// TODO: if ProviderError::UnsupportedRpc, we should retry on another server
#[derive(Debug)]
enum ResponseTypes {
Revert,
RateLimit,
Error,
}
// check for "execution reverted" here
// TODO: move this info a function on ResponseErrorType
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
if let Some(_err) = err.as_serde_error() {
// this seems to pretty much always be a rate limit error
ResponseTypes::RateLimit
} else if let Some(err) = err.as_error_response() {
// JsonRpc and Application errors get rolled into the JsonRpcClientError
let msg = err.message.as_str();
trace!(%msg, "jsonrpc error message");
if msg.starts_with("execution reverted") {
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
// TODO! THIS HAS TOO MANY FALSE POSITIVES! Theres another spot in the code that checks for things.
ResponseTypes::RateLimit
} else {
ResponseTypes::Error
}
} else {
// i don't think this is possible
warn!(?err, "unexpected error");
ResponseTypes::Error
}
} else {
ResponseTypes::Error
};
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
// TODO: how long should we actually wait? different providers have different times
// TODO: if rate_limit_period_seconds is set, use that
// TODO: check response headers for rate limits too
let retry_at = Instant::now() + Duration::from_secs(1);
if self.rpc.backup {
debug!(?retry_at, ?err, "rate limited on {}!", self.rpc);
} else {
warn!(?retry_at, ?err, "rate limited on {}!", self.rpc);
}
hard_limit_until.send_replace(retry_at);
}
}
// TODO: think more about the method and param logs. those can be sensitive information
// we do **NOT** use self.error_handler here because it might have been modified
let error_handler = self.error_handler();
enum ResponseType {
Error,
Revert,
RateLimited,
}
let response_type: ResponseType = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload {
Payload::Success { .. } => unimplemented!(),
Payload::Error { error } => {
trace!(?error, "jsonrpc error data");
if error.message.starts_with("execution reverted") {
ResponseType::Revert
} else if error.code == StatusCode::TOO_MANY_REQUESTS.as_u16() as i64 {
ResponseType::RateLimited
} else {
// TODO! THIS HAS TOO MANY FALSE POSITIVES! Theres another spot in the code that checks for things.
// if error.message.contains("limit") || error.message.contains("request") {
// self.rate_limit_for(Duration::from_secs(1));
// }
match error.code {
-32000 => {
// TODO: regex?
let archive_prefixes = [
"header not found",
"header for hash not found",
"missing trie node",
];
for prefix in archive_prefixes {
if error.message.starts_with(prefix) {
// TODO: what error?
response = Err(Web3ProxyError::NoBlockNumberOrHash);
break;
}
}
}
-32601 => {
let error_msg = error.message.as_ref();
// sometimes a provider does not support all rpc methods
// we check other connections rather than returning the error
// but sometimes the method is something that is actually unsupported,
// so we save the response here to return it later
// some providers look like this
if (error_msg.starts_with("the method")
&& error_msg.ends_with("is not available"))
|| error_msg == "Method not found"
{
let method = self.web3_request.inner.method().to_string();
response =
Err(Web3ProxyError::MethodNotFound(method.into()))
}
}
_ => {}
}
ResponseType::Error
}
}
},
Ok(_) => unreachable!(),
Err(_) => ResponseType::Error,
};
match error_handler {
RequestErrorHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if matches!(response_type, ResponseTypes::Revert) {
if matches!(response_type, ResponseType::Revert) {
trace!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"revert",
);
} else {
debug!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
}
@ -377,18 +435,16 @@ impl OpenRequestHandle {
RequestErrorHandler::InfoLevel => {
info!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::TraceLevel => {
trace!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
}
@ -396,9 +452,8 @@ impl OpenRequestHandle {
// TODO: only include params if not running in release mode
error!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
}
@ -406,38 +461,43 @@ impl OpenRequestHandle {
// TODO: only include params if not running in release mode
warn!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::Save => {
trace!(
rpc=%self.rpc,
%method,
?params,
?err,
%self.web3_request,
?response,
"bad response",
);
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method = Method::try_from_value(&method.to_string()).unwrap();
// TODO: open this up for even more methods
let method: Method =
Method::try_from_value(&self.web3_request.inner.method().to_string())
.unwrap();
// TODO: i don't think this prsing is correct
match serde_json::from_value::<EthCallParams>(json!(params)) {
match serde_json::from_value::<EthCallParams>(json!(self
.web3_request
.inner
.params()))
{
Ok(params) => {
// spawn saving to the database so we don't slow down the request
// TODO: log if this errors
// TODO: aren't the method and params already saved? this should just need the response
let f = authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}
Err(err) => {
warn!(
?method,
?params,
?err,
%self.web3_request,
?response,
"failed parsing eth_call params. unable to save revert",
);
}

@ -555,7 +555,7 @@ impl RpcQueryStats {
// TODO: do this without cloning. we can take their vec
let backend_rpcs_used = metadata.backend_rpcs_used();
let request_bytes = metadata.request.num_bytes() as u64;
let request_bytes = metadata.inner.num_bytes() as u64;
let response_bytes = metadata.response_bytes.load(Ordering::Relaxed);
let mut error_response = metadata.error_response.load(Ordering::Relaxed);
@ -587,7 +587,7 @@ impl RpcQueryStats {
x => x,
};
let cu = ComputeUnit::new(metadata.request.method(), metadata.chain_id, response_bytes);
let cu = ComputeUnit::new(metadata.inner.method(), metadata.chain_id, response_bytes);
let cache_hit = backend_rpcs_used.is_empty();
@ -598,7 +598,7 @@ impl RpcQueryStats {
&metadata.usd_per_cu,
);
let method = metadata.request.method().to_string().into();
let method = metadata.inner.method().to_string().into();
let x = Self {
archive_request,

@ -201,7 +201,7 @@ impl MigrateStatsToV2SubCommand {
head_block: None,
// debug data is in kafka, not mysql or influx
kafka_debug_logger: None,
request,
inner: request,
// This is not relevant in the new version
no_servers: 0.into(),
response_bytes: int_response_bytes.into(),