Refactor FrontendErrorResponse into Web3ProxyError

Renamed FrontendResponse to Web3ProxyResponse and introduced
a new generic type alias Web3ProxyResult.

Fixed a few noisy cargo warnings.
This commit is contained in:
Rory Neithinger 2023-03-16 19:38:11 -07:00
parent ffd63444b2
commit f3fc4924dc
14 changed files with 178 additions and 190 deletions

@ -1,8 +1,8 @@
use crate::app::Web3ProxyApp;
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::http_params::get_user_id_from_params;
use anyhow::Context;
use axum::response::{IntoResponse, Response};
use axum::response::IntoResponse;
use axum::{
headers::{authorization::Bearer, Authorization},
Json, TypedHeader,
@ -24,25 +24,21 @@ pub async fn query_admin_modify_usertier<'a>(
app: &'a Web3ProxyApp,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
) -> Result<Response, FrontendErrorResponse> {
) -> Web3ProxyResponse {
// Quickly return if any of the input tokens are bad
let user_address: Vec<u8> = params
.get("user_address")
.ok_or_else(|| {
FrontendErrorResponse::BadRequest(
"Unable to find user_address key in request".to_string(),
)
Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string())
})?
.parse::<Address>()
.map_err(|_| {
FrontendErrorResponse::BadRequest(
"Unable to parse user_address as an Address".to_string(),
)
Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string())
})?
.to_fixed_bytes()
.into();
let user_tier_title = params.get("user_tier_title").ok_or_else(|| {
FrontendErrorResponse::BadRequest(
Web3ProxyError::BadRequest(
"Unable to get the user_tier_title key from the request".to_string(),
)
})?;
@ -78,7 +74,7 @@ pub async fn query_admin_modify_usertier<'a>(
.filter(admin::Column::UserId.eq(caller_id))
.one(&db_conn)
.await?
.ok_or(FrontendErrorResponse::AccessDenied)?;
.ok_or(Web3ProxyError::AccessDenied)?;
// If we are here, that means an admin was found, and we can safely proceed
@ -87,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>(
.filter(user::Column::Address.eq(user_address))
.one(&db_conn)
.await?
.ok_or(FrontendErrorResponse::BadRequest(
.ok_or(Web3ProxyError::BadRequest(
"No user with this id found".to_string(),
))?;
// Return early if the target user_tier_id is the same as the original user_tier_id
@ -101,7 +97,7 @@ pub async fn query_admin_modify_usertier<'a>(
.filter(user_tier::Column::Title.eq(user_tier_title.clone()))
.one(&db_conn)
.await?
.ok_or(FrontendErrorResponse::BadRequest(
.ok_or(Web3ProxyError::BadRequest(
"User Tier name was not found".to_string(),
))?;

@ -4,12 +4,12 @@ mod ws;
use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
@ -407,7 +407,7 @@ impl Web3ProxyApp {
shutdown_sender: broadcast::Sender<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let rpc_account_shutdown_recevier = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
let _background_shutdown_receiver = shutdown_sender.subscribe();
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
@ -811,26 +811,27 @@ impl Web3ProxyApp {
app_handles.push(config_handle);
}
// =======
// if important_background_handles.is_empty() {
// info!("no important background handles");
//
// let f = tokio::spawn(async move {
// let _ = background_shutdown_receiver.recv().await;
//
// Ok(())
// });
//
// important_background_handles.push(f);
// >>>>>>> 77df3fa (stats v2)
// =======
// if important_background_handles.is_empty() {
// info!("no important background handles");
//
// let f = tokio::spawn(async move {
// let _ = background_shutdown_receiver.recv().await;
//
// Ok(())
// });
//
// important_background_handles.push(f);
// >>>>>>> 77df3fa (stats v2)
Ok((
app,
app_handles,
important_background_handles,
new_top_config_sender,
consensus_connections_watcher
).into())
consensus_connections_watcher,
)
.into())
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
@ -1041,7 +1042,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> Result<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
) -> Web3ProxyResult<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
@ -1079,7 +1080,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
) -> Result<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
) -> Web3ProxyResult<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1087,7 +1088,7 @@ impl Web3ProxyApp {
// TODO: improve flattening
// get the head block now so that any requests that need it all use the same block
// TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way
// TODO: Web3ProxyError that handles "no servers synced" in a consistent way
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
.balanced_rpcs
@ -1153,7 +1154,7 @@ impl Web3ProxyApp {
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
head_block_num: Option<U64>,
) -> Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>)> {
// trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?);
@ -1591,7 +1592,7 @@ impl Web3ProxyApp {
)
.map_err(|x| {
trace!("bad request: {:?}", x);
FrontendErrorResponse::BadRequest(
Web3ProxyError::BadRequest(
"param 0 could not be read as H256".to_string(),
)
})?;
@ -1628,7 +1629,7 @@ impl Web3ProxyApp {
method => {
if method.starts_with("admin_") {
// TODO: emit a stat? will probably just be noise
return Err(FrontendErrorResponse::AccessDenied);
return Err(Web3ProxyError::AccessDenied);
}
// emit stats

@ -1,10 +1,10 @@
//! Handle admin helper logic
use super::authorization::login_is_authorized;
use super::errors::FrontendResult;
use super::errors::Web3ProxyResponse;
use crate::admin_queries::query_admin_modify_usertier;
use crate::app::Web3ProxyApp;
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::errors::Web3ProxyError;
use crate::user_token::UserBearerToken;
use crate::PostLogin;
use anyhow::Context;
@ -43,7 +43,7 @@ pub async fn admin_change_user_roles(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let response = query_admin_modify_usertier(&app, bearer, &params).await?;
Ok(response)
@ -58,7 +58,7 @@ pub async fn admin_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
InsecureClientIp(ip): InsecureClientIp,
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// First check if the login is authorized
login_is_authorized(&app, ip).await?;
@ -85,30 +85,22 @@ pub async fn admin_login_get(
let admin_address: Address = params
.get("admin_address")
.ok_or_else(|| {
FrontendErrorResponse::BadRequest(
"Unable to find admin_address key in request".to_string(),
)
Web3ProxyError::BadRequest("Unable to find admin_address key in request".to_string())
})?
.parse::<Address>()
.map_err(|_err| {
FrontendErrorResponse::BadRequest(
"Unable to parse admin_address as an Address".to_string(),
)
Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".to_string())
})?;
// Fetch the user_address parameter from the login string ... (as who we want to be logging in ...)
let user_address: Vec<u8> = params
.get("user_address")
.ok_or_else(|| {
FrontendErrorResponse::BadRequest(
"Unable to find user_address key in request".to_string(),
)
Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string())
})?
.parse::<Address>()
.map_err(|_err| {
FrontendErrorResponse::BadRequest(
"Unable to parse user_address as an Address".to_string(),
)
Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string())
})?
.to_fixed_bytes()
.into();
@ -156,7 +148,7 @@ pub async fn admin_login_get(
.filter(user::Column::Address.eq(user_address))
.one(db_replica.conn())
.await?
.ok_or(FrontendErrorResponse::BadRequest(
.ok_or(Web3ProxyError::BadRequest(
"Could not find user in db".to_string(),
))?;
@ -164,7 +156,7 @@ pub async fn admin_login_get(
.filter(user::Column::Address.eq(admin_address.encode()))
.one(db_replica.conn())
.await?
.ok_or(FrontendErrorResponse::BadRequest(
.ok_or(Web3ProxyError::BadRequest(
"Could not find admin in db".to_string(),
))?;
@ -233,7 +225,7 @@ pub async fn admin_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
InsecureClientIp(ip): InsecureClientIp,
Json(payload): Json<PostLogin>,
) -> FrontendResult {
) -> Web3ProxyResponse {
login_is_authorized(&app, ip).await?;
// Check for the signed bytes ..
@ -422,7 +414,7 @@ pub async fn admin_login_post(
pub async fn admin_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let user_bearer = UserBearerToken::try_from(bearer)?;
let db_conn = app.db_conn().context("database needed for user logout")?;

@ -1,6 +1,6 @@
//! Utilities for authorization of logged in and anonymous users.
use super::errors::FrontendErrorResponse;
use super::errors::{Web3ProxyError, Web3ProxyResult};
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::rpcs::one::Web3Rpc;
@ -308,14 +308,11 @@ impl Authorization {
/// rate limit logins only by ip.
/// we want all origins and referers and user agents to count together
pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<Authorization, FrontendErrorResponse> {
pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyResult<Authorization> {
let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? {
RateLimitResult::Allowed(authorization, None) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
}
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
@ -330,7 +327,7 @@ pub async fn ip_is_authorized(
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (authorization, semaphore) = match app
@ -345,7 +342,7 @@ pub async fn ip_is_authorized(
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::RateLimited(authorization, retry_at) => {
// TODO: in the background, emit a stat (maybe simplest to use a channel?)
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
}
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
@ -389,7 +386,7 @@ pub async fn ip_is_authorized(
Ok((authorization, semaphore))
}
/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse;
/// like app.rate_limit_by_rpc_key but converts to a Web3ProxyError;
pub async fn key_is_authorized(
app: &Arc<Web3ProxyApp>,
rpc_key: RpcSecretKey,
@ -398,7 +395,7 @@ pub async fn key_is_authorized(
proxy_mode: ProxyMode,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
// check the rate limits. error if over the limit
// TODO: i think this should be in an "impl From" or "impl Into"
let (authorization, semaphore) = match app
@ -407,9 +404,9 @@ pub async fn key_is_authorized(
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
}
RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey),
RateLimitResult::UnknownKey => return Err(Web3ProxyError::UnknownKey),
};
// TODO: DRY and maybe optimize the hashing
@ -517,7 +514,7 @@ impl Web3ProxyApp {
pub async fn bearer_is_authorized(
&self,
bearer: Bearer,
) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> {
) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> {
// get the user id for this bearer token
let user_bearer_token = UserBearerToken::try_from(bearer)?;
@ -869,7 +866,7 @@ impl Authorization {
pub async fn check_again(
&self,
app: &Arc<Web3ProxyApp>,
) -> Result<(Arc<Self>, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
) -> Web3ProxyResult<(Arc<Self>, Option<OwnedSemaphorePermit>)> {
// TODO: we could probably do this without clones. but this is easy
let (a, s) = if let Some(rpc_secret_key) = self.checks.rpc_secret_key {
key_is_authorized(

@ -17,12 +17,13 @@ use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
pub type Web3ProxyResult<T> = Result<T, Web3ProxyError>;
// TODO: take "IntoResponse" instead of Response?
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
pub type Web3ProxyResponse = Web3ProxyResult<Response>;
// TODO:
#[derive(Debug, From)]
pub enum FrontendErrorResponse {
pub enum Web3ProxyError {
AccessDenied,
Anyhow(anyhow::Error),
BadRequest(String),
@ -46,7 +47,7 @@ pub enum FrontendErrorResponse {
UnknownKey,
}
impl FrontendErrorResponse {
impl Web3ProxyError {
pub fn into_response_parts(self) -> (StatusCode, JsonRpcForwardedResponse) {
match self {
Self::AccessDenied => {
@ -296,7 +297,7 @@ impl FrontendErrorResponse {
}
}
impl IntoResponse for FrontendErrorResponse {
impl IntoResponse for Web3ProxyError {
fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs
// TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY
@ -307,5 +308,5 @@ impl IntoResponse for FrontendErrorResponse {
}
pub async fn handler_404() -> Response {
FrontendErrorResponse::NotFound.into_response()
Web3ProxyError::NotFound.into_response()
}

@ -1,7 +1,7 @@
//! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server.
use super::authorization::{ip_is_authorized, key_is_authorized};
use super::errors::FrontendResult;
use super::errors::Web3ProxyResponse;
use super::rpc_proxy_ws::ProxyMode;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
@ -22,7 +22,7 @@ pub async fn proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await
}
@ -32,7 +32,7 @@ pub async fn fastest_proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: read the fastest number from params
// TODO: check that the app allows this without authentication
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await
@ -44,7 +44,7 @@ pub async fn versus_proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await
}
@ -54,7 +54,7 @@ async fn _proxy_web3_rpc(
origin: Option<TypedHeader<Origin>>,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: benchmark spawning this
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
@ -124,7 +124,7 @@ pub async fn proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc_with_key(
app,
ip,
@ -147,7 +147,7 @@ pub async fn debug_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc_with_key(
app,
ip,
@ -170,7 +170,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc_with_key(
app,
ip,
@ -193,7 +193,7 @@ pub async fn versus_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_proxy_web3_rpc_with_key(
app,
ip,
@ -217,7 +217,7 @@ async fn _proxy_web3_rpc_with_key(
rpc_key: String,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: DRY w/ proxy_web3_rpc
// the request can take a while, so we spawn so that we can start serving another request
let rpc_key = rpc_key.parse()?;

@ -3,7 +3,7 @@
//! WebSockets are the preferred method of receiving requests, but not all clients have good support.
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata};
use super::errors::{FrontendErrorResponse, FrontendResult};
use super::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::stats::RpcQueryStats;
use crate::{
app::Web3ProxyApp,
@ -59,7 +59,7 @@ pub async fn websocket_handler(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await
}
@ -71,7 +71,7 @@ pub async fn fastest_websocket_handler(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: get the fastest number from the url params (default to 0/all)
// TODO: config to disable this
_websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await
@ -85,7 +85,7 @@ pub async fn versus_websocket_handler(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: config to disable this
_websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await
}
@ -96,7 +96,7 @@ async fn _websocket_handler(
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
@ -134,7 +134,7 @@ pub async fn websocket_handler_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_websocket_handler_with_key(
ProxyMode::Best,
app,
@ -157,7 +157,7 @@ pub async fn debug_websocket_handler_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_websocket_handler_with_key(
ProxyMode::Debug,
app,
@ -180,7 +180,7 @@ pub async fn fastest_websocket_handler_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: get the fastest number from the url params (default to 0/all)
_websocket_handler_with_key(
ProxyMode::Fastest(0),
@ -204,7 +204,7 @@ pub async fn versus_websocket_handler_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
_websocket_handler_with_key(
ProxyMode::Versus,
app,
@ -228,7 +228,7 @@ async fn _websocket_handler_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let rpc_key = rpc_key.parse()?;
let (authorization, _semaphore) = key_is_authorized(
@ -260,7 +260,7 @@ async fn _websocket_handler_with_key(
&app.config.redirect_rpc_key_url,
authorization.checks.rpc_secret_key_id,
) {
(None, None, _) => Err(FrontendErrorResponse::StatusCode(
(None, None, _) => Err(Web3ProxyError::StatusCode(
StatusCode::BAD_REQUEST,
"this page is for rpcs".to_string(),
None,
@ -273,7 +273,7 @@ async fn _websocket_handler_with_key(
if authorization.checks.rpc_secret_key_id.is_none() {
// i don't think this is possible
Err(FrontendErrorResponse::StatusCode(
Err(Web3ProxyError::StatusCode(
StatusCode::UNAUTHORIZED,
"AUTHORIZATION header required".to_string(),
None,
@ -291,7 +291,7 @@ async fn _websocket_handler_with_key(
}
}
// any other combinations get a simple error
_ => Err(FrontendErrorResponse::StatusCode(
_ => Err(Web3ProxyError::StatusCode(
StatusCode::BAD_REQUEST,
"this page is for rpcs".to_string(),
None,
@ -419,7 +419,7 @@ async fn handle_socket_payload(
.await
.map_or_else(
|err| match err {
FrontendErrorResponse::Anyhow(err) => Err(err),
Web3ProxyError::Anyhow(err) => Err(err),
_ => {
error!("handle this better! {:?}", err);
Err(anyhow::anyhow!("unexpected error! {:?}", err))

@ -1,6 +1,6 @@
//! Handle registration, logins, and managing account data.
use super::authorization::{login_is_authorized, RpcSecretKey};
use super::errors::FrontendResult;
use super::errors::Web3ProxyResponse;
use crate::app::Web3ProxyApp;
use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
@ -65,7 +65,7 @@ pub async fn user_login_get(
InsecureClientIp(ip): InsecureClientIp,
// TODO: what does axum's error handling look like if the path fails to parse?
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
login_is_authorized(&app, ip).await?;
// create a message and save it in redis
@ -165,7 +165,7 @@ pub async fn user_login_post(
InsecureClientIp(ip): InsecureClientIp,
Query(query): Query<PostLoginQuery>,
Json(payload): Json<PostLogin>,
) -> FrontendResult {
) -> Web3ProxyResponse {
login_is_authorized(&app, ip).await?;
// TODO: this seems too verbose. how can we simply convert a String into a [u8; 65]
@ -373,7 +373,7 @@ pub async fn user_login_post(
pub async fn user_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let user_bearer = UserBearerToken::try_from(bearer)?;
let db_conn = app.db_conn().context("database needed for user logout")?;
@ -417,7 +417,7 @@ pub async fn user_logout_post(
pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
Ok(Json(user).into_response())
@ -435,7 +435,7 @@ pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserPost>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
let mut user: user::ActiveModel = user.into();
@ -480,8 +480,8 @@ pub async fn user_post(
pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
todo!("user_balance_get");
}
@ -495,8 +495,8 @@ pub async fn user_balance_get(
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
todo!("user_balance_post");
}
@ -506,7 +506,7 @@ pub async fn user_balance_post(
pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let db_replica = app
@ -535,8 +535,8 @@ pub async fn rpc_keys_get(
pub async fn rpc_keys_delete(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
) -> Web3ProxyResponse {
let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?;
// TODO: think about how cascading deletes and billing should work
Err(anyhow::anyhow!("work in progress").into())
@ -567,7 +567,7 @@ pub async fn rpc_keys_management(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserKeyManagement>,
) -> FrontendResult {
) -> Web3ProxyResponse {
// TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
@ -738,7 +738,7 @@ pub async fn user_revert_logs_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let (user, _semaphore) = app.bearer_is_authorized(bearer).await?;
let chain_id = get_chain_id_from_params(app.as_ref(), &params)?;
@ -808,7 +808,7 @@ pub async fn user_stats_aggregated_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let response = query_user_stats(&app, bearer, &params, StatType::Aggregated).await?;
Ok(response)
@ -828,7 +828,7 @@ pub async fn user_stats_detailed_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
) -> Web3ProxyResponse {
let response = query_user_stats(&app, bearer, &params, StatType::Detailed).await?;
Ok(response)

@ -1,5 +1,5 @@
use crate::app::DatabaseReplica;
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::{app::Web3ProxyApp, user_token::UserBearerToken};
use anyhow::Context;
use axum::{
@ -24,7 +24,7 @@ pub async fn get_user_id_from_params(
// this is a long type. should we strip it down?
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &HashMap<String, String>,
) -> Result<u64, FrontendErrorResponse> {
) -> Web3ProxyResult<u64> {
match (bearer, params.get("user_id")) {
(Some(TypedHeader(Authorization(bearer))), Some(user_id)) => {
// check for the bearer cache key
@ -45,7 +45,7 @@ pub async fn get_user_id_from_params(
.one(db_replica.conn())
.await
.context("database error while querying for user")?
.ok_or(FrontendErrorResponse::AccessDenied)?;
.ok_or(Web3ProxyError::AccessDenied)?;
// if expired, delete ALL expired logins
let now = Utc::now();
@ -60,7 +60,7 @@ pub async fn get_user_id_from_params(
// TODO: emit a stat? if this is high something weird might be happening
debug!("cleared expired logins: {:?}", delete_result);
return Err(FrontendErrorResponse::AccessDenied);
return Err(Web3ProxyError::AccessDenied);
}
save_to_redis = true;
@ -76,7 +76,7 @@ pub async fn get_user_id_from_params(
let user_id: u64 = user_id.parse().context("Parsing user_id param")?;
if bearer_user_id != user_id {
return Err(FrontendErrorResponse::AccessDenied);
return Err(Web3ProxyError::AccessDenied);
}
if save_to_redis {
@ -103,7 +103,7 @@ pub async fn get_user_id_from_params(
// TODO: proper error code from a useful error code
// TODO: maybe instead of this sharp edged warn, we have a config value?
// TODO: check config for if we should deny or allow this
Err(FrontendErrorResponse::AccessDenied)
Err(Web3ProxyError::AccessDenied)
// // TODO: make this a flag
// warn!("allowing without auth during development!");
// Ok(x.parse()?)
@ -215,7 +215,7 @@ pub fn get_query_stop_from_params(
pub fn get_query_window_seconds_from_params(
params: &HashMap<String, String>,
) -> Result<u64, FrontendErrorResponse> {
) -> Web3ProxyResult<u64> {
params.get("query_window_seconds").map_or_else(
|| {
// no page in params. set default
@ -225,7 +225,7 @@ pub fn get_query_window_seconds_from_params(
// parse the given timestamp
query_window_seconds.parse::<u64>().map_err(|err| {
trace!("Unable to parse rpc_key_id: {:#?}", err);
FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string())
Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string())
})
},
)

@ -398,7 +398,7 @@ impl Web3Rpcs {
consensus_finder: &mut ConsensusFinder,
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
_pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
if !consensus_finder

@ -167,7 +167,8 @@ impl Web3Rpcs {
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
watch::channel(Default::default());
// by_name starts empty. self.apply_server_configs will add to it
let by_name = Default::default();
@ -318,43 +319,43 @@ impl Web3Rpcs {
}
}
// <<<<<<< HEAD
// <<<<<<< HEAD
Ok(())
}
// =======
// // TODO: max_capacity and time_to_idle from config
// // all block hashes are the same size, so no need for weigher
// let block_hashes = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// // all block numbers are the same size, so no need for weigher
// let block_numbers = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
//
// let (watch_consensus_connections_sender, consensus_connections_watcher) =
// watch::channel(Default::default());
//
// let watch_consensus_head_receiver =
// watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
//
// let connections = Arc::new(Self {
// by_name: connections,
// watch_consensus_rpcs_sender: watch_consensus_connections_sender,
// watch_consensus_head_receiver,
// pending_transactions,
// block_hashes,
// block_numbers,
// min_sum_soft_limit,
// min_head_rpcs,
// max_block_age,
// max_block_lag,
// });
//
// let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
// >>>>>>> 77df3fa (stats v2)
// =======
// // TODO: max_capacity and time_to_idle from config
// // all block hashes are the same size, so no need for weigher
// let block_hashes = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// // all block numbers are the same size, so no need for weigher
// let block_numbers = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
//
// let (watch_consensus_connections_sender, consensus_connections_watcher) =
// watch::channel(Default::default());
//
// let watch_consensus_head_receiver =
// watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
//
// let connections = Arc::new(Self {
// by_name: connections,
// watch_consensus_rpcs_sender: watch_consensus_connections_sender,
// watch_consensus_head_receiver,
// pending_transactions,
// block_hashes,
// block_numbers,
// min_sum_soft_limit,
// min_head_rpcs,
// max_block_age,
// max_block_lag,
// });
//
// let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
// >>>>>>> 77df3fa (stats v2)
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).cloned()
@ -364,12 +365,12 @@ impl Web3Rpcs {
self.by_name.read().len()
}
// <<<<<<< HEAD
// <<<<<<< HEAD
pub fn is_empty(&self) -> bool {
self.by_name.read().is_empty()
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
}
pub fn min_head_rpcs(&self) -> usize {
@ -884,11 +885,11 @@ impl Web3Rpcs {
// TODO: maximum retries? right now its the total number of servers
loop {
// <<<<<<< HEAD
// <<<<<<< HEAD
if skip_rpcs.len() >= self.by_name.read().len() {
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
break;
}
@ -1159,18 +1160,18 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// <<<<<<< HEAD
// <<<<<<< HEAD
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
// =======
// =======
// TODO: i don't think this will ever happen
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
// TODO: subscribe to something in ConsensusWeb3Rpcs instead
sleep(Duration::from_millis(200)).await;
// >>>>>>> 77df3fa (stats v2)
// >>>>>>> 77df3fa (stats v2)
continue;
}
@ -1216,7 +1217,7 @@ impl Web3Rpcs {
)
.await
}
ProxyMode::Fastest(x) => todo!("Fastest"),
ProxyMode::Fastest(_x) => todo!("Fastest"),
ProxyMode::Versus => todo!("Versus"),
}
}
@ -1299,13 +1300,13 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;
#[tokio::test]

@ -242,12 +242,12 @@ impl Web3Rpc {
block_data_limit,
reconnect,
tier: config.tier,
// <<<<<<< HEAD
// <<<<<<< HEAD
disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
// =======
// =======
head_block: RwLock::new(Default::default()),
// >>>>>>> 77df3fa (stats v2)
// >>>>>>> 77df3fa (stats v2)
..Default::default()
};
@ -1110,7 +1110,7 @@ impl Web3Rpc {
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_ref() {
Web3Provider::Http(provider) => {
Web3Provider::Http(_provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
self.wait_for_disconnect().await?;
}
@ -1224,11 +1224,11 @@ impl Web3Rpc {
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
// <<<<<<< HEAD
// <<<<<<< HEAD
let hard_limit_ready = *hard_limit_until.borrow();
// =======
// let hard_limit_ready = hard_limit_until.borrow().to_owned();
// >>>>>>> 77df3fa (stats v2)
// =======
// let hard_limit_ready = hard_limit_until.borrow().to_owned();
// >>>>>>> 77df3fa (stats v2)
let now = Instant::now();

@ -1,11 +1,11 @@
use crate::app::Web3ProxyApp;
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult};
use crate::http_params::{
get_chain_id_from_params, get_page_from_params, get_query_start_from_params,
get_query_window_seconds_from_params, get_user_id_from_params,
};
use anyhow::Context;
use axum::response::{IntoResponse, Response};
use axum::response::IntoResponse;
use axum::Json;
use axum::{
headers::{authorization::Bearer, Authorization},
@ -227,7 +227,7 @@ pub fn filter_query_window_seconds(
query_window_seconds: u64,
response: &mut HashMap<&str, serde_json::Value>,
q: Select<rpc_accounting::Entity>,
) -> Result<Select<rpc_accounting::Entity>, FrontendErrorResponse> {
) -> Web3ProxyResult<Select<rpc_accounting::Entity>> {
if query_window_seconds == 0 {
// TODO: order by more than this?
// query_window_seconds is not set so we aggregate all records
@ -261,7 +261,7 @@ pub async fn query_user_stats<'a>(
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
stat_response_type: StatType,
) -> Result<Response, FrontendErrorResponse> {
) -> Web3ProxyResponse {
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_replica = app
.db_replica()
@ -408,7 +408,7 @@ pub async fn query_user_stats<'a>(
// TODO: move getting the param and checking the bearer token into a helper function
if let Some(rpc_key_id) = params.get("rpc_key_id") {
let rpc_key_id = rpc_key_id.parse::<u64>().map_err(|e| {
FrontendErrorResponse::StatusCode(
Web3ProxyError::StatusCode(
StatusCode::BAD_REQUEST,
"Unable to parse rpc_key_id".to_string(),
Some(e.into()),

@ -1,7 +1,7 @@
use super::StatType;
use crate::{
app::Web3ProxyApp,
frontend::errors::FrontendErrorResponse,
frontend::errors::Web3ProxyResponse,
http_params::{
get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params,
get_query_window_seconds_from_params, get_user_id_from_params,
@ -10,7 +10,7 @@ use crate::{
use anyhow::Context;
use axum::{
headers::{authorization::Bearer, Authorization},
response::{IntoResponse, Response},
response::IntoResponse,
Json, TypedHeader,
};
use chrono::{DateTime, FixedOffset};
@ -34,7 +34,7 @@ pub async fn query_user_stats<'a>(
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
stat_response_type: StatType,
) -> Result<Response, FrontendErrorResponse> {
) -> Web3ProxyResponse {
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_replica = app
.db_replica()