remove more anyhows from app module
the only ones left should be in the top level spawn functions.
This commit is contained in:
parent
60c1a6d382
commit
847e961eb0
@ -4,7 +4,7 @@ mod ws;
|
|||||||
use crate::block_number::{block_needed, BlockNeeded};
|
use crate::block_number::{block_needed, BlockNeeded};
|
||||||
use crate::config::{AppConfig, TopConfig};
|
use crate::config::{AppConfig, TopConfig};
|
||||||
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
|
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
|
||||||
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
|
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
||||||
use crate::frontend::rpc_proxy_ws::ProxyMode;
|
use crate::frontend::rpc_proxy_ws::ProxyMode;
|
||||||
use crate::jsonrpc::{
|
use crate::jsonrpc::{
|
||||||
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
|
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
|
||||||
@ -1089,12 +1089,11 @@ impl Web3ProxyApp {
|
|||||||
// TODO: improve flattening
|
// TODO: improve flattening
|
||||||
|
|
||||||
// get the head block now so that any requests that need it all use the same block
|
// get the head block now so that any requests that need it all use the same block
|
||||||
// TODO: Web3ProxyError that handles "no servers synced" in a consistent way
|
|
||||||
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
|
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
|
||||||
let head_block_num = self
|
let head_block_num = self
|
||||||
.balanced_rpcs
|
.balanced_rpcs
|
||||||
.head_block_num()
|
.head_block_num()
|
||||||
.context(anyhow::anyhow!("no servers synced"))?;
|
.ok_or(Web3ProxyError::NoServersSynced)?;
|
||||||
|
|
||||||
let responses = join_all(
|
let responses = join_all(
|
||||||
requests
|
requests
|
||||||
@ -1366,7 +1365,7 @@ impl Web3ProxyApp {
|
|||||||
|
|
||||||
let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() {
|
let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() {
|
||||||
serde_json::from_str(gas_estimate.get())
|
serde_json::from_str(gas_estimate.get())
|
||||||
.context("gas estimate result is not an U256")?
|
.or(Err(Web3ProxyError::GasEstimateNotU256))?
|
||||||
} else {
|
} else {
|
||||||
// i think this is always an error response
|
// i think this is always an error response
|
||||||
let rpcs = request_metadata.backend_requests.lock().clone();
|
let rpcs = request_metadata.backend_requests.lock().clone();
|
||||||
@ -1456,15 +1455,15 @@ impl Web3ProxyApp {
|
|||||||
{
|
{
|
||||||
let params = request
|
let params = request
|
||||||
.params
|
.params
|
||||||
.context("there must be params if we got this far")?;
|
.web3_context("there must be params if we got this far")?;
|
||||||
|
|
||||||
let params = params
|
let params = params
|
||||||
.as_array()
|
.as_array()
|
||||||
.context("there must be an array if we got this far")?
|
.web3_context("there must be an array if we got this far")?
|
||||||
.get(0)
|
.get(0)
|
||||||
.context("there must be an item if we got this far")?
|
.web3_context("there must be an item if we got this far")?
|
||||||
.as_str()
|
.as_str()
|
||||||
.context("there must be a string if we got this far")?;
|
.web3_context("there must be a string if we got this far")?;
|
||||||
|
|
||||||
let params = Bytes::from_str(params)
|
let params = Bytes::from_str(params)
|
||||||
.expect("there must be Bytes if we got this far");
|
.expect("there must be Bytes if we got this far");
|
||||||
@ -1586,7 +1585,8 @@ impl Web3ProxyApp {
|
|||||||
let param = Bytes::from_str(
|
let param = Bytes::from_str(
|
||||||
params[0]
|
params[0]
|
||||||
.as_str()
|
.as_str()
|
||||||
.context("parsing params 0 into str then bytes")?,
|
.ok_or(Web3ProxyError::ParseBytesError(None))
|
||||||
|
.web3_context("parsing params 0 into str then bytes")?,
|
||||||
)
|
)
|
||||||
.map_err(|x| {
|
.map_err(|x| {
|
||||||
trace!("bad request: {:?}", x);
|
trace!("bad request: {:?}", x);
|
||||||
@ -1635,7 +1635,7 @@ impl Web3ProxyApp {
|
|||||||
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
|
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
|
||||||
let head_block_num = head_block_num
|
let head_block_num = head_block_num
|
||||||
.or(self.balanced_rpcs.head_block_num())
|
.or(self.balanced_rpcs.head_block_num())
|
||||||
.context("no servers synced")?;
|
.ok_or(Web3ProxyError::NoServersSynced)?;
|
||||||
|
|
||||||
// we do this check before checking caches because it might modify the request params
|
// we do this check before checking caches because it might modify the request params
|
||||||
// TODO: add a stat for archive vs full since they should probably cost different
|
// TODO: add a stat for archive vs full since they should probably cost different
|
||||||
@ -1793,7 +1793,7 @@ impl Web3ProxyApp {
|
|||||||
stat_sender
|
stat_sender
|
||||||
.send_async(response_stat.into())
|
.send_async(response_stat.into())
|
||||||
.await
|
.await
|
||||||
.context("stat_sender sending response_stat")?;
|
.map_err(Web3ProxyError::SendAppStatError)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok((response, rpcs));
|
return Ok((response, rpcs));
|
||||||
@ -1816,7 +1816,7 @@ impl Web3ProxyApp {
|
|||||||
stat_sender
|
stat_sender
|
||||||
.send_async(response_stat.into())
|
.send_async(response_stat.into())
|
||||||
.await
|
.await
|
||||||
.context("stat_sender sending response stat")?;
|
.map_err(Web3ProxyError::SendAppStatError)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff {
|
if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff {
|
||||||
@ -1826,7 +1826,7 @@ impl Web3ProxyApp {
|
|||||||
.expect("if headers are set, producer must exist");
|
.expect("if headers are set, producer must exist");
|
||||||
|
|
||||||
let response_bytes =
|
let response_bytes =
|
||||||
rmp_serde::to_vec(&response).context("failed msgpack serialize response")?;
|
rmp_serde::to_vec(&response).web3_context("failed msgpack serialize response")?;
|
||||||
|
|
||||||
let f = async move {
|
let f = async move {
|
||||||
let produce_future = kafka_producer.send(
|
let produce_future = kafka_producer.send(
|
||||||
|
@ -2,12 +2,11 @@
|
|||||||
|
|
||||||
use super::Web3ProxyApp;
|
use super::Web3ProxyApp;
|
||||||
use crate::frontend::authorization::{Authorization, RequestMetadata};
|
use crate::frontend::authorization::{Authorization, RequestMetadata};
|
||||||
use crate::frontend::errors::Web3ProxyResult;
|
use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult};
|
||||||
use crate::jsonrpc::JsonRpcForwardedResponse;
|
use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||||
use crate::jsonrpc::JsonRpcRequest;
|
use crate::jsonrpc::JsonRpcRequest;
|
||||||
use crate::rpcs::transactions::TxStatus;
|
use crate::rpcs::transactions::TxStatus;
|
||||||
use crate::stats::RpcQueryStats;
|
use crate::stats::RpcQueryStats;
|
||||||
use anyhow::Context;
|
|
||||||
use axum::extract::ws::Message;
|
use axum::extract::ws::Message;
|
||||||
use ethers::prelude::U64;
|
use ethers::prelude::U64;
|
||||||
use futures::future::AbortHandle;
|
use futures::future::AbortHandle;
|
||||||
@ -31,7 +30,7 @@ impl Web3ProxyApp {
|
|||||||
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
|
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
|
||||||
// TODO: this is not efficient
|
// TODO: this is not efficient
|
||||||
let request_bytes = serde_json::to_string(&request_json)
|
let request_bytes = serde_json::to_string(&request_json)
|
||||||
.context("finding request size")?
|
.web3_context("finding request size")?
|
||||||
.len();
|
.len();
|
||||||
|
|
||||||
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
|
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
|
||||||
|
@ -778,7 +778,7 @@ impl Web3ProxyApp {
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
authorization_checks.map_err(Web3ProxyError::SeaRc)
|
authorization_checks.map_err(Web3ProxyError::Arc)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
|
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
|
||||||
|
@ -31,6 +31,7 @@ pub enum Web3ProxyError {
|
|||||||
AccessDenied,
|
AccessDenied,
|
||||||
#[error(ignore)]
|
#[error(ignore)]
|
||||||
Anyhow(anyhow::Error),
|
Anyhow(anyhow::Error),
|
||||||
|
Arc(Arc<Web3ProxyError>),
|
||||||
#[error(ignore)]
|
#[error(ignore)]
|
||||||
#[from(ignore)]
|
#[from(ignore)]
|
||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
@ -41,6 +42,7 @@ pub enum Web3ProxyError {
|
|||||||
EthersHttpClientError(ethers::prelude::HttpClientError),
|
EthersHttpClientError(ethers::prelude::HttpClientError),
|
||||||
EthersProviderError(ethers::prelude::ProviderError),
|
EthersProviderError(ethers::prelude::ProviderError),
|
||||||
EthersWsClientError(ethers::prelude::WsClientError),
|
EthersWsClientError(ethers::prelude::WsClientError),
|
||||||
|
GasEstimateNotU256,
|
||||||
Headers(headers::Error),
|
Headers(headers::Error),
|
||||||
HeaderToString(ToStrError),
|
HeaderToString(ToStrError),
|
||||||
InfluxDb2RequestError(influxdb2::RequestError),
|
InfluxDb2RequestError(influxdb2::RequestError),
|
||||||
@ -70,7 +72,9 @@ pub enum Web3ProxyError {
|
|||||||
#[error(ignore)]
|
#[error(ignore)]
|
||||||
#[from(ignore)]
|
#[from(ignore)]
|
||||||
OriginNotAllowed(headers::Origin),
|
OriginNotAllowed(headers::Origin),
|
||||||
ParseBytesError(ethers::types::ParseBytesError),
|
#[display(fmt = "{:?}", _0)]
|
||||||
|
#[error(ignore)]
|
||||||
|
ParseBytesError(Option<ethers::types::ParseBytesError>),
|
||||||
ParseMsgError(siwe::ParseError),
|
ParseMsgError(siwe::ParseError),
|
||||||
ParseAddressError,
|
ParseAddressError,
|
||||||
#[display(fmt = "{:?}, {:?}", _0, _1)]
|
#[display(fmt = "{:?}, {:?}", _0, _1)]
|
||||||
@ -81,8 +85,8 @@ pub enum Web3ProxyError {
|
|||||||
#[error(ignore)]
|
#[error(ignore)]
|
||||||
#[from(ignore)]
|
#[from(ignore)]
|
||||||
RefererNotAllowed(headers::Referer),
|
RefererNotAllowed(headers::Referer),
|
||||||
SeaRc(Arc<Web3ProxyError>),
|
|
||||||
SemaphoreAcquireError(AcquireError),
|
SemaphoreAcquireError(AcquireError),
|
||||||
|
SendAppStatError(flume::SendError<crate::stats::AppStat>),
|
||||||
SerdeJson(serde_json::Error),
|
SerdeJson(serde_json::Error),
|
||||||
/// simple way to return an error message to the user and an anyhow to our logs
|
/// simple way to return an error message to the user and an anyhow to our logs
|
||||||
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
|
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
|
||||||
@ -217,6 +221,17 @@ impl Web3ProxyError {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Self::GasEstimateNotU256 => {
|
||||||
|
warn!("GasEstimateNotU256");
|
||||||
|
(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
JsonRpcForwardedResponse::from_str(
|
||||||
|
"gas estimate result is not an U256",
|
||||||
|
Some(StatusCode::BAD_REQUEST.as_u16().into()),
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
Self::Headers(err) => {
|
Self::Headers(err) => {
|
||||||
warn!("HeadersError {:?}", err);
|
warn!("HeadersError {:?}", err);
|
||||||
(
|
(
|
||||||
@ -548,7 +563,7 @@ impl Web3ProxyError {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) {
|
Self::Arc(err) => match Arc::try_unwrap(err) {
|
||||||
Ok(err) => err,
|
Ok(err) => err,
|
||||||
Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)),
|
Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)),
|
||||||
}
|
}
|
||||||
@ -565,6 +580,17 @@ impl Web3ProxyError {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Self::SendAppStatError(err) => {
|
||||||
|
error!("SendAppStatError err={:?}", err);
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
JsonRpcForwardedResponse::from_str(
|
||||||
|
"error stat_sender sending response_stat",
|
||||||
|
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
Self::SerdeJson(err) => {
|
Self::SerdeJson(err) => {
|
||||||
warn!("serde json err={:?}", err);
|
warn!("serde json err={:?}", err);
|
||||||
(
|
(
|
||||||
@ -725,6 +751,12 @@ impl Web3ProxyError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<ethers::types::ParseBytesError> for Web3ProxyError {
|
||||||
|
fn from(err: ethers::types::ParseBytesError) -> Self {
|
||||||
|
Self::ParseBytesError(Some(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<tokio::time::error::Elapsed> for Web3ProxyError {
|
impl From<tokio::time::error::Elapsed> for Web3ProxyError {
|
||||||
fn from(err: tokio::time::error::Elapsed) -> Self {
|
fn from(err: tokio::time::error::Elapsed) -> Self {
|
||||||
Self::Timeout(Some(err))
|
Self::Timeout(Some(err))
|
||||||
|
Loading…
Reference in New Issue
Block a user