move permits around and start rpcs slower

This commit is contained in:
Bryan Stitt 2023-10-11 02:00:50 -07:00
parent 2c175103ae
commit e43309abc0
37 changed files with 1539 additions and 686 deletions

View File

@ -1,4 +1,4 @@
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::http_params::get_user_id_from_params;
@ -21,7 +21,7 @@ use tracing::{info, trace};
// This function is used to give permission to certain users
pub async fn query_admin_modify_usertier<'a>(
app: &'a Web3ProxyApp,
app: &'a App,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
) -> Web3ProxyResponse {

View File

@ -3,11 +3,11 @@ mod ws;
use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCache};
use crate::config::{AppConfig, TopConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, Web3Request};
use crate::frontend::authorization::Authorization;
use crate::globals::{global_db_conn, DatabaseError, APP, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcParams, JsonRpcRequestEnum, JsonRpcResultData, LooseId,
SingleRequest, SingleResponse,
SingleRequest, SingleResponse, ValidatedRequest,
};
use crate::relational_db::{connect_db, migrate_db};
use crate::response_cache::{ForwardedResponse, JsonRpcResponseCache, JsonRpcResponseWeigher};
@ -67,7 +67,7 @@ pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>;
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
pub struct App {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
/// Send 4337 Abstraction Bundler requests to one of these servers
@ -161,7 +161,7 @@ pub async fn flatten_handles<T>(
/// starting an app creates many tasks
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
pub app: Arc<App>,
/// handles for the balanced and private rpcs
pub app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// these are important and must be allowed to finish
@ -172,7 +172,7 @@ pub struct Web3ProxyAppSpawn {
pub ranked_rpcs: watch::Receiver<Option<Arc<RankedRpcs>>>,
}
impl Web3ProxyApp {
impl App {
/// The main entrypoint.
pub async fn spawn(
frontend_port: Arc<AtomicU16>,
@ -983,7 +983,8 @@ impl Web3ProxyApp {
authorization: Arc<Authorization>,
) -> Web3ProxyResult<R> {
// TODO: proper ids
let request = SingleRequest::new(LooseId::Number(1), method.to_string(), json!(params))?;
let request =
SingleRequest::new(LooseId::Number(1), method.to_string().into(), json!(params))?;
let (_, response, _) = self.proxy_request(request, authorization, None).await;
@ -1097,7 +1098,7 @@ impl Web3ProxyApp {
/// if no protected rpcs are configured, then some public rpcs are used instead
async fn try_send_protected(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
if self.protected_rpcs.is_empty() {
self.balanced_rpcs.request_with_metadata(web3_request).await
@ -1118,17 +1119,23 @@ impl Web3ProxyApp {
// TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone();
let web3_request =
match Web3Request::new_with_app(self, authorization, None, request.into(), head_block)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
let web3_request = match ValidatedRequest::new_with_app(
self,
authorization,
None,
None,
request.into(),
head_block,
)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
return (a, b, vec![]);
}
};
return (a, b, vec![]);
}
};
// TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them
@ -1205,7 +1212,7 @@ impl Web3ProxyApp {
/// TODO: how can we make this generic?
async fn _proxy_request_with_caching(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<jsonrpc::SingleResponse> {
// TODO: serve net_version without querying the backend
// TODO: don't force RawValue
@ -1757,7 +1764,7 @@ impl Web3ProxyApp {
}
}
impl fmt::Debug for Web3ProxyApp {
impl fmt::Debug for App {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()

View File

@ -1,9 +1,9 @@
//! Websocket-specific functions for the Web3ProxyApp
use super::Web3ProxyApp;
use super::App;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{RequestOrMethod, Web3Request};
use crate::jsonrpc;
use crate::frontend::authorization::RequestOrMethod;
use crate::jsonrpc::{self, ValidatedRequest};
use crate::response_cache::ForwardedResponse;
use axum::extract::ws::{CloseFrame, Message};
use deferred_rate_limiter::DeferredRateLimitResult;
@ -21,10 +21,10 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::WatchStream;
use tracing::{error, trace};
impl Web3ProxyApp {
impl App {
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
web3_request: Arc<Web3Request>,
web3_request: Arc<ValidatedRequest>,
subscription_count: &'a AtomicU64,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: mpsc::Sender<Message>,
@ -82,10 +82,12 @@ impl Web3ProxyApp {
continue;
};
let subscription_web3_request = Web3Request::new_with_app(
// TODO: this needs a permit
let subscription_web3_request = ValidatedRequest::new_with_app(
&app,
authorization.clone(),
None,
None,
RequestOrMethod::Method("eth_subscribe(newHeads)".into(), 0),
Some(new_head),
)
@ -168,10 +170,12 @@ impl Web3ProxyApp {
}
Ok(new_txid) => {
// TODO: include the head_block here?
match Web3Request::new_with_app(
// todo!(this needs a permit)
match ValidatedRequest::new_with_app(
&app,
authorization.clone(),
None,
None,
RequestOrMethod::Method(
"eth_subscribe(newPendingTransactions)".into(),
0,
@ -261,7 +265,7 @@ impl Web3ProxyApp {
Ok((subscription_abort_handle, response))
}
async fn rate_limit_close_websocket(&self, web3_request: &Web3Request) -> Option<Message> {
async fn rate_limit_close_websocket(&self, web3_request: &ValidatedRequest) -> Option<Message> {
let authorization = &web3_request.authorization;
if !authorization.active_premium().await {

View File

@ -1,5 +1,5 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::jsonrpc::SingleRequest;
use crate::{
errors::{Web3ProxyError, Web3ProxyResult},
@ -69,7 +69,7 @@ pub async fn clean_block_number<'a>(
params: &'a mut serde_json::Value,
block_param_id: usize,
head_block: &'a Web3ProxyBlock,
app: Option<&'a Web3ProxyApp>,
app: Option<&'a App>,
) -> Web3ProxyResult<BlockNumAndHash> {
match params.as_array_mut() {
None => {
@ -262,7 +262,7 @@ impl CacheMode {
pub async fn new<'a>(
request: &'a mut SingleRequest,
head_block: Option<&'a Web3ProxyBlock>,
app: Option<&'a Web3ProxyApp>,
app: Option<&'a App>,
) -> Self {
match Self::try_new(request, head_block, app).await {
Ok(x) => return x,
@ -296,7 +296,7 @@ impl CacheMode {
pub async fn try_new(
request: &mut SingleRequest,
head_block: Option<&Web3ProxyBlock>,
app: Option<&Web3ProxyApp>,
app: Option<&App>,
) -> Web3ProxyResult<Self> {
let params = &mut request.params;
@ -329,7 +329,7 @@ impl CacheMode {
}
}
match request.method.as_str() {
match request.method.as_ref() {
"debug_traceTransaction" => {
// TODO: make sure re-orgs work properly!
Ok(CacheMode::SuccessForever)
@ -541,7 +541,7 @@ mod test {
let id = LooseId::Number(9);
let mut request = SingleRequest::new(id, method.to_string(), params).unwrap();
let mut request = SingleRequest::new(id, method.into(), params).unwrap();
// TODO: instead of empty, check None?
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
@ -576,7 +576,7 @@ mod test {
let id = LooseId::Number(99);
let mut request = SingleRequest::new(id, method.to_string(), params).unwrap();
let mut request = SingleRequest::new(id, method.into(), params).unwrap();
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
.await
@ -611,7 +611,7 @@ mod test {
let head_block = Web3ProxyBlock::try_new(Arc::new(head_block)).unwrap();
let mut request = SingleRequest::new(99.into(), method.to_string(), params).unwrap();
let mut request = SingleRequest::new(99.into(), method.into(), params).unwrap();
let x = CacheMode::try_new(&mut request, Some(&head_block), None)
.await

View File

@ -10,6 +10,7 @@ use migration::sea_orm::prelude::Decimal;
use std::{ops::Add, str::FromStr};
use tracing::{instrument, trace, warn};
/// TODO: i don't like how we use this inside the config and also have it available publicly. we should only getting this value from the config
pub fn default_usd_per_cu(chain_id: u64) -> Decimal {
match chain_id {
999_001_999 => Decimal::from_str("0.10").unwrap(),

View File

@ -2,7 +2,7 @@
use super::authorization::login_is_authorized;
use crate::admin_queries::query_admin_modify_usertier;
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::Web3ProxyResponse;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext};
use crate::frontend::users::authentication::PostLogin;
@ -52,7 +52,7 @@ pub struct AdminIncreaseBalancePost {
/// - user_role_tier that is supposed to be adapted
#[debug_handler]
pub async fn admin_increase_balance(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<AdminIncreaseBalancePost>,
) -> Web3ProxyResponse {
@ -115,7 +115,7 @@ pub async fn admin_increase_balance(
/// TODO: JSON post data instead of query params
#[debug_handler]
pub async fn admin_change_user_roles(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -130,7 +130,7 @@ pub async fn admin_change_user_roles(
/// We assume that the admin has already logged in, and has a bearer token ...
#[debug_handler]
pub async fn admin_imitate_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Path(mut params): Path<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -287,7 +287,7 @@ pub async fn admin_imitate_login_get(
/// The bearer token can be used to authenticate other admin requests
#[debug_handler]
pub async fn admin_imitate_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Json(payload): Json<PostLogin>,
) -> Web3ProxyResponse {

View File

@ -1,52 +1,39 @@
//! Utilities for authorization of logged in and anonymous users.
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::app::{App, APP_USER_AGENT};
use crate::balance::Balance;
use crate::block_number::CacheMode;
use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::globals::{global_db_replica_conn, APP};
use crate::jsonrpc::{self, JsonRpcParams, LooseId, SingleRequest};
use crate::kafka::KafkaDebugLogger;
use crate::response_cache::JsonRpcQueryCacheKey;
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::one::Web3Rpc;
use crate::globals::global_db_replica_conn;
use crate::jsonrpc::{self, SingleRequest};
use crate::secrets::RpcSecretKey;
use crate::stats::{AppStat, BackendRequests};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use deferred_rate_limiter::{DeferredRateLimitResult, DeferredRateLimiter};
use derivative::Derivative;
use derive_more::From;
use entities::{login, rpc_key, user, user_tier};
use ethers::types::{Bytes, U64};
use ethers::types::Bytes;
use ethers::utils::keccak256;
use futures::TryFutureExt;
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt::{self, Debug, Display};
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::mem;
use std::num::NonZeroU64;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64};
use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::{error, trace, warn};
use ulid::Ulid;
@ -55,7 +42,7 @@ use uuid::Uuid;
/// TODO: should this have IpAddr and Origin or AuthorizationChecks?
#[derive(Debug)]
pub enum RateLimitResult {
Allowed(Authorization, Option<OwnedSemaphorePermit>),
Allowed(Authorization),
RateLimited(
Authorization,
/// when their rate limit resets and they can try more requests
@ -129,7 +116,7 @@ impl Hash for RpcSecretKey {
}
}
#[derive(Debug, Default, From, Serialize)]
#[derive(Clone, Debug, Default, From, Serialize)]
pub enum RequestOrMethod {
Request(SingleRequest),
/// sometimes we don't have a full request. for example, when we are logging a websocket subscription
@ -138,117 +125,6 @@ pub enum RequestOrMethod {
None,
}
/// TODO: instead of a bunch of atomics, this should probably use a RwLock
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct Web3Request {
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: AtomicBool,
pub authorization: Arc<Authorization>,
pub cache_mode: CacheMode,
/// TODO: this should probably be in a global config. although maybe if we run multiple chains in one process this will be useful
pub chain_id: u64,
pub head_block: Option<Web3ProxyBlock>,
/// TODO: this should be in a global config. not copied to every single request
pub usd_per_cu: Decimal,
pub inner: RequestOrMethod,
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
#[derivative(Default(value = "Instant::now()"))]
pub start_instant: Instant,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
/// The number of times the request got stuck waiting because no servers were synced
pub no_servers: AtomicU64,
/// If handling the request hit an application error
/// This does not count things like a transcation reverting or a malformed request
pub error_response: AtomicBool,
/// Size in bytes of the JSON response. Does not include headers or things like that.
pub response_bytes: AtomicU64,
/// How many milliseconds it took to respond to the request
pub response_millis: AtomicU64,
/// What time the (first) response was proxied.
/// TODO: think about how to store response times for ProxyMode::Versus
pub response_timestamp: AtomicI64,
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub response_from_backup_rpc: AtomicBool,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: AtomicBool,
/// ProxyMode::Debug logs requests and responses with Kafka
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
/// How long to spend waiting for an rpc that can serve this request
pub connect_timeout: Duration,
/// How long to spend waiting for an rpc to respond to this request
/// TODO: this should start once the connection is established
pub expire_timeout: Duration,
}
impl Display for Web3Request {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}({})",
self.inner.method(),
serde_json::to_string(self.inner.params()).expect("this should always serialize")
)
}
}
impl Serialize for Web3Request {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("request", 7)?;
state.serialize_field(
"archive_request",
&self.archive_request.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field("chain_id", &self.chain_id)?;
state.serialize_field("head_block", &self.head_block)?;
state.serialize_field("request", &self.inner)?;
state.serialize_field("elapsed", &self.start_instant.elapsed().as_secs_f32())?;
{
let backend_names = self.backend_requests.lock();
let backend_names = backend_names
.iter()
.map(|x| x.name.as_str())
.collect::<Vec<_>>();
state.serialize_field("backend_requests", &backend_names)?;
}
state.serialize_field(
"response_bytes",
&self.response_bytes.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
}
impl Default for Authorization {
fn default() -> Self {
Authorization::internal().unwrap()
@ -266,7 +142,7 @@ impl RequestOrMethod {
pub fn method(&self) -> &str {
match self {
Self::Request(x) => x.method.as_str(),
Self::Request(x) => x.method.as_ref(),
Self::Method(x, _) => x,
Self::None => "unknown",
}
@ -329,288 +205,6 @@ impl ResponseOrBytes<'_> {
}
}
impl Web3Request {
#[allow(clippy::too_many_arguments)]
async fn new_with_options(
app: Option<&Web3ProxyApp>,
authorization: Arc<Authorization>,
chain_id: u64,
head_block: Option<Web3ProxyBlock>,
kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
max_wait: Option<Duration>,
mut request: RequestOrMethod,
stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
usd_per_cu: Decimal,
) -> Web3ProxyResult<Arc<Self>> {
let start_instant = Instant::now();
// let request: RequestOrMethod = request.into();
// we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key
// this is because calculating the cache_key may modify the params!
// for example, if the request specifies "latest" as the block number, we replace it with the actual latest block number
if let Some(ref kafka_debug_logger) = kafka_debug_logger {
// TODO: channels might be more ergonomic than spawned futures
// spawned things run in parallel easier but generally need more Arcs
kafka_debug_logger.log_debug_request(&request);
}
// now that kafka has logged the user's original params, we can calculate the cache key
// TODO: modify CacheMode::new to wait for a future block if one is requested! be sure to update head_block too!
let cache_mode = match &mut request {
RequestOrMethod::Request(x) => CacheMode::new(x, head_block.as_ref(), app).await,
_ => CacheMode::Never,
};
let connect_timeout = Duration::from_secs(3);
let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295));
let x = Self {
archive_request: false.into(),
authorization,
backend_requests: Default::default(),
cache_mode,
chain_id,
error_response: false.into(),
connect_timeout,
expire_timeout,
head_block: head_block.clone(),
kafka_debug_logger,
no_servers: 0.into(),
inner: request,
response_bytes: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
response_timestamp: 0.into(),
start_instant,
stat_sender,
usd_per_cu,
user_error_response: false.into(),
};
Ok(Arc::new(x))
}
pub async fn new_with_app(
app: &Web3ProxyApp,
authorization: Arc<Authorization>,
max_wait: Option<Duration>,
request: RequestOrMethod,
head_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<Arc<Self>> {
// TODO: get this out of tracing instead (where we have a String from Amazon's LB)
let request_ulid = Ulid::new();
let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
KafkaDebugLogger::try_new(
app,
authorization.clone(),
head_block.as_ref().map(|x| x.number()),
"web3_proxy:rpc",
request_ulid,
)
} else {
None
};
let chain_id = app.config.chain_id;
let stat_sender = app.stat_sender.clone();
let usd_per_cu = app.config.usd_per_cu.unwrap_or_default();
Self::new_with_options(
Some(app),
authorization,
chain_id,
head_block,
kafka_debug_logger,
max_wait,
request,
stat_sender,
usd_per_cu,
)
.await
}
pub async fn new_internal<P: JsonRpcParams>(
method: String,
params: &P,
head_block: Option<Web3ProxyBlock>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Arc<Self>> {
let authorization = Arc::new(Authorization::internal().unwrap());
// TODO: we need a real id! increment a counter on the app
let id = LooseId::Number(1);
// TODO: this seems inefficient
let request = SingleRequest::new(id, method, json!(params)).unwrap();
if let Some(app) = APP.get() {
Self::new_with_app(app, authorization, max_wait, request.into(), head_block).await
} else {
Self::new_with_options(
None,
authorization,
0,
head_block,
None,
max_wait,
request.into(),
None,
Default::default(),
)
.await
}
}
#[inline]
pub fn backend_rpcs_used(&self) -> Vec<Arc<Web3Rpc>> {
self.backend_requests.lock().clone()
}
pub fn cache_key(&self) -> Option<u64> {
match &self.cache_mode {
CacheMode::Never => None,
x => {
let x = JsonRpcQueryCacheKey::new(x, &self.inner).hash();
Some(x)
}
}
}
#[inline]
pub fn cache_jsonrpc_errors(&self) -> bool {
self.cache_mode.cache_jsonrpc_errors()
}
#[inline]
pub fn id(&self) -> Box<RawValue> {
self.inner.id()
}
#[inline]
pub fn max_block_needed(&self) -> Option<U64> {
self.cache_mode.to_block().map(|x| *x.num())
}
pub fn min_block_needed(&self) -> Option<U64> {
if self.archive_request.load(atomic::Ordering::Relaxed) {
Some(U64::zero())
} else {
self.cache_mode.from_block().map(|x| *x.num())
}
}
#[inline]
pub fn connect_timeout_at(&self) -> Instant {
// TODO: get from config
self.start_instant + Duration::from_secs(3)
}
#[inline]
pub fn connect_timeout(&self) -> bool {
self.connect_timeout_at() <= Instant::now()
}
#[inline]
pub fn expire_at(&self) -> Instant {
// TODO: get from config
// erigon's timeout is 5 minutes so we want it shorter than that
self.start_instant + Duration::from_secs(295)
}
#[inline]
pub fn expired(&self) -> bool {
self.expire_at() <= Instant::now()
}
pub fn try_send_stat(mut self) -> Web3ProxyResult<()> {
if let Some(stat_sender) = self.stat_sender.take() {
trace!(?self, "sending stat");
let stat: AppStat = self.into();
if let Err(err) = stat_sender.send(stat) {
error!(?err, "failed sending stat");
// TODO: return it? that seems like it might cause an infinite loop
// TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge
};
trace!("stat sent successfully");
}
Ok(())
}
pub fn add_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
// TODO: fetch? set? should it be None in a Mutex? or a OnceCell?
let response = response.into();
let num_bytes = response.num_bytes() as u64;
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::Relaxed);
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::Relaxed,
);
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::Relaxed);
// TODO: set user_error_response and error_response here instead of outside this function
if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() {
if let ResponseOrBytes::Response(response) = response {
match response {
jsonrpc::SingleResponse::Parsed(response) => {
kafka_debug_logger.log_debug_response(response);
}
jsonrpc::SingleResponse::Stream(_) => {
warn!("need to handle streaming response debug logging");
}
}
}
}
}
pub fn try_send_arc_stat(self: Arc<Self>) -> Web3ProxyResult<()> {
match Arc::into_inner(self) {
Some(x) => x.try_send_stat(),
None => {
trace!("could not send stat while other arcs are active");
Ok(())
}
}
}
#[inline]
pub fn proxy_mode(&self) -> ProxyMode {
self.authorization.checks.proxy_mode
}
// TODO: helper function to duplicate? needs to clear request_bytes, and all the atomics tho...
}
// TODO: is this where the panic comes from?
impl Drop for Web3Request {
fn drop(&mut self) {
if self.stat_sender.is_some() {
// turn `&mut self` into `self`
let x = mem::take(self);
trace!(?x, "request metadata dropped without stat send");
let _ = x.try_send_stat();
}
}
}
impl Default for RpcSecretKey {
fn default() -> Self {
Self::new()
@ -734,6 +328,13 @@ impl Authorization {
)
}
pub async fn permit(&self, app: &App) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
match self.checks.rpc_secret_key_id {
None => app.permit_public_concurrency(&self.ip).await,
Some(_) => app.permit_premium_concurrency(self).await,
}
}
pub fn try_new(
authorization_checks: AuthorizationChecks,
ip: &IpAddr,
@ -801,9 +402,9 @@ impl Authorization {
/// rate limit logins only by ip.
/// we want all origins and referers and user agents to count together
pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyResult<Authorization> {
pub async fn login_is_authorized(app: &App, ip: IpAddr) -> Web3ProxyResult<Authorization> {
let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? {
RateLimitResult::Allowed(authorization, None) => authorization,
RateLimitResult::Allowed(authorization) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
}
@ -817,15 +418,15 @@ pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyRes
/// semaphore won't ever be None, but its easier if key auth and ip auth work the same way
/// keep the semaphore alive until the user's request is entirely complete
pub async fn ip_is_authorized(
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
ip: &IpAddr,
origin: Option<&Origin>,
proxy_mode: ProxyMode,
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
) -> Web3ProxyResult<Authorization> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (authorization, semaphore) = match app.rate_limit_public(ip, origin, proxy_mode).await? {
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
let authorization = match app.rate_limit_public(ip, origin, proxy_mode).await? {
RateLimitResult::Allowed(authorization) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
// TODO: in the background, emit a stat (maybe simplest to use a channel?)
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
@ -871,27 +472,27 @@ pub async fn ip_is_authorized(
tokio::spawn(f);
}
Ok((authorization, semaphore))
Ok(authorization)
}
/// like app.rate_limit_by_rpc_key but converts to a Web3ProxyError;
/// keep the semaphore alive until the user's request is entirely complete
pub async fn key_is_authorized(
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
rpc_key: &RpcSecretKey,
ip: &IpAddr,
origin: Option<&Origin>,
proxy_mode: ProxyMode,
referer: Option<&Referer>,
user_agent: Option<&UserAgent>,
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
) -> Web3ProxyResult<Authorization> {
// check the rate limits. error if over the limit
// TODO: i think this should be in an "impl From" or "impl Into"
let (authorization, semaphore) = match app
let authorization = match app
.rate_limit_premium(ip, origin, proxy_mode, referer, rpc_key, user_agent)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::Allowed(authorization) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(Web3ProxyError::RateLimited(authorization, retry_at));
}
@ -935,11 +536,12 @@ pub async fn key_is_authorized(
tokio::spawn(f);
}
Ok((authorization, semaphore))
Ok(authorization)
}
impl Web3ProxyApp {
impl App {
/// Limit the number of concurrent requests from the given ip address.
/// TODO: should this take an Authorization isntead of an IpAddr?
pub async fn permit_public_concurrency(
&self,
ip: &IpAddr,
@ -976,7 +578,6 @@ impl Web3ProxyApp {
pub async fn permit_premium_concurrency(
&self,
authorization: &Authorization,
ip: &IpAddr,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
let authorization_checks = &authorization.checks;
@ -988,7 +589,7 @@ impl Web3ProxyApp {
let semaphore = self
.user_semaphores
.get_with_by_ref(&(user_id, *ip), async move {
.get_with_by_ref(&(user_id, authorization.ip), async move {
let s = Semaphore::new(max_concurrent_requests as usize);
Arc::new(s)
})
@ -1078,7 +679,7 @@ impl Web3ProxyApp {
// TODO: localhost being unlimited should be optional
let authorization = Authorization::internal()?;
return Ok(RateLimitResult::Allowed(authorization, None));
return Ok(RateLimitResult::Allowed(authorization));
}
let allowed_origin_requests_per_period = &self.config.allowed_origin_requests_per_period;
@ -1109,23 +710,16 @@ impl Web3ProxyApp {
.await?;
}
if let RateLimitResult::Allowed(a, b) = x {
debug_assert!(b.is_none());
let permit = self.permit_public_concurrency(ip).await?;
x = RateLimitResult::Allowed(a, permit)
if let RateLimitResult::Allowed(a) = x {
x = RateLimitResult::Allowed(a)
}
debug_assert!(!matches!(x, RateLimitResult::UnknownKey));
Ok(x)
} else {
// no redis, but we can still check the ip semaphore
let permit = self.permit_public_concurrency(ip).await?;
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::Allowed(authorization, permit))
Ok(RateLimitResult::Allowed(authorization))
}
}
@ -1370,13 +964,9 @@ impl Web3ProxyApp {
.await?;
}
if let RateLimitResult::Allowed(a, b) = x {
debug_assert!(b.is_none());
if let RateLimitResult::Allowed(a) = x {
// only allow this rpc_key to run a limited amount of concurrent requests
let permit = self.permit_premium_concurrency(&a, ip).await?;
x = RateLimitResult::Allowed(a, permit)
x = RateLimitResult::Allowed(a)
}
debug_assert!(!matches!(x, RateLimitResult::UnknownKey));
@ -1387,20 +977,18 @@ impl Web3ProxyApp {
}
}
let permit = self.permit_premium_concurrency(&authorization, ip).await?;
Ok(RateLimitResult::Allowed(authorization, permit))
Ok(RateLimitResult::Allowed(authorization))
}
}
impl Authorization {
pub async fn check_again(
&self,
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
) -> Web3ProxyResult<(Arc<Self>, Option<OwnedSemaphorePermit>)> {
// TODO: we could probably do this without clones. but this is easy
let (a, s) = if let Some(ref rpc_secret_key) = self.checks.rpc_secret_key {
key_is_authorized(
let (a, p) = if let Some(ref rpc_secret_key) = self.checks.rpc_secret_key {
let a = key_is_authorized(
app,
rpc_secret_key,
&self.ip,
@ -1409,14 +997,25 @@ impl Authorization {
self.referer.as_ref(),
self.user_agent.as_ref(),
)
.await?
.await?;
let p = app.permit_premium_concurrency(&a).await?;
(a, p)
} else {
ip_is_authorized(app, &self.ip, self.origin.as_ref(), self.checks.proxy_mode).await?
let a = ip_is_authorized(app, &self.ip, self.origin.as_ref(), self.checks.proxy_mode)
.await?;
let p = app.permit_public_concurrency(&self.ip).await?;
(a, p)
};
let a = Arc::new(a);
Ok((a, s))
// todo!(semaphore permit)
Ok((a, p))
}
}
@ -1436,7 +1035,7 @@ where
max_requests_per_period.or(authorization.checks.max_requests_per_period);
let x = match rate_limiter.throttle(key, max_requests_per_period, 1).await {
Ok(DeferredRateLimitResult::Allowed) => RateLimitResult::Allowed(authorization, None),
Ok(DeferredRateLimitResult::Allowed) => RateLimitResult::Allowed(authorization),
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
@ -1455,7 +1054,7 @@ where
// internal error, not rate limit being hit
error!(?err, %key, "rate limiter is unhappy. allowing key");
RateLimitResult::Allowed(authorization, None)
RateLimitResult::Allowed(authorization)
}
};
@ -1479,7 +1078,7 @@ pub async fn redis_rate_limit(
.throttle_label(label.unwrap_or_default(), max_requests_per_period, 1)
.await
{
Ok(RedisRateLimitResult::Allowed(..)) => RateLimitResult::Allowed(authorization, None),
Ok(RedisRateLimitResult::Allowed(..)) => RateLimitResult::Allowed(authorization),
Ok(RedisRateLimitResult::RetryAt(new_retry_at, ..)) => {
retry_at = retry_at.min(Some(new_retry_at));
@ -1492,11 +1091,11 @@ pub async fn redis_rate_limit(
// this an internal error of some kind, not the rate limit being hit
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
RateLimitResult::Allowed(authorization, None)
RateLimitResult::Allowed(authorization)
}
}
} else {
RateLimitResult::Allowed(authorization, None)
RateLimitResult::Allowed(authorization)
};
Ok(x)

View File

@ -10,7 +10,7 @@ pub mod rpc_proxy_ws;
pub mod status;
pub mod users;
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::Web3ProxyResult;
use axum::{
routing::{get, post},
@ -45,7 +45,7 @@ pub type ResponseCache = Cache<ResponseCacheKey, (StatusCode, &'static str, axum
/// Start the frontend server.
pub async fn serve(
app: Arc<Web3ProxyApp>,
app: Arc<App>,
mut shutdown_receiver: broadcast::Receiver<()>,
shutdown_complete_sender: broadcast::Sender<()>,
) -> Web3ProxyResult<()> {

View File

@ -3,7 +3,7 @@
use super::authorization::{ip_is_authorized, key_is_authorized};
use super::rpc_proxy_ws::ProxyMode;
use crate::errors::Web3ProxyError;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use crate::{app::App, jsonrpc::JsonRpcRequestEnum};
use axum::extract::rejection::JsonRejection;
use axum::extract::Path;
use axum::headers::{Origin, Referer, UserAgent};
@ -23,7 +23,7 @@ use std::time::Duration;
/// If possible, please use a WebSocket instead.
#[debug_handler]
pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
@ -33,7 +33,7 @@ pub async fn proxy_web3_rpc(
#[debug_handler]
pub async fn fastest_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
@ -45,7 +45,7 @@ pub async fn fastest_proxy_web3_rpc(
#[debug_handler]
pub async fn versus_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
@ -54,7 +54,7 @@ pub async fn versus_proxy_web3_rpc(
}
async fn _proxy_web3_rpc(
app: Arc<Web3ProxyApp>,
app: Arc<App>,
ip: &IpAddr,
origin: Option<&Origin>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
@ -67,7 +67,7 @@ async fn _proxy_web3_rpc(
let first_id = payload.first_id();
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode)
let authorization = ip_is_authorized(&app, ip, origin, proxy_mode)
.await
.map_err(|e| e.into_response_with_id(first_id.clone()))?;
@ -126,7 +126,7 @@ async fn _proxy_web3_rpc(
/// If possible, please use a WebSocket instead.
#[debug_handler]
pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
@ -151,7 +151,7 @@ pub async fn proxy_web3_rpc_with_key(
#[debug_handler]
#[allow(clippy::too_many_arguments)]
pub async fn debug_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
@ -195,7 +195,7 @@ pub async fn debug_proxy_web3_rpc_with_key(
#[debug_handler]
pub async fn fastest_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
@ -218,7 +218,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
#[debug_handler]
pub async fn versus_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
@ -241,7 +241,7 @@ pub async fn versus_proxy_web3_rpc_with_key(
#[allow(clippy::too_many_arguments)]
async fn _proxy_web3_rpc_with_key(
app: Arc<Web3ProxyApp>,
app: Arc<App>,
ip: &IpAddr,
origin: Option<&Origin>,
referer: Option<&Referer>,
@ -262,7 +262,7 @@ async fn _proxy_web3_rpc_with_key(
.parse()
.map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.clone()))?;
let (authorization, _semaphore) =
let authorization =
key_is_authorized(&app, &rpc_key, ip, origin, proxy_mode, referer, user_agent)
.await
.map_err(|e| e.into_response_with_id(first_id.clone()))?;

View File

@ -2,10 +2,10 @@
//!
//! WebSockets are the preferred method of receiving requests, but not all clients have good support.
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, Web3Request};
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization};
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::jsonrpc::{self, ParsedResponse};
use crate::{app::Web3ProxyApp, errors::Web3ProxyResult, jsonrpc::SingleRequest};
use crate::jsonrpc::{self, ParsedResponse, ValidatedRequest};
use crate::{app::App, errors::Web3ProxyResult, jsonrpc::SingleRequest};
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -54,7 +54,7 @@ pub enum ProxyMode {
/// Queries a single server at a time
#[debug_handler]
pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
@ -66,7 +66,7 @@ pub async fn websocket_handler(
/// Queries all synced backends with every request! This might get expensive!
// #[debug_handler]
pub async fn fastest_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
@ -87,7 +87,7 @@ pub async fn fastest_websocket_handler(
/// Queries **all** backends with every request! This might get expensive!
#[debug_handler]
pub async fn versus_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
@ -98,12 +98,12 @@ pub async fn versus_websocket_handler(
async fn _websocket_handler(
proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>,
app: Arc<App>,
ip: &IpAddr,
origin: Option<&Origin>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse {
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let authorization = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let authorization = Arc::new(authorization);
@ -127,7 +127,7 @@ async fn _websocket_handler(
/// Can optionally authorized based on origin, referer, or user agent.
#[debug_handler]
pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
@ -151,7 +151,7 @@ pub async fn websocket_handler_with_key(
#[debug_handler]
#[allow(clippy::too_many_arguments)]
pub async fn debug_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
@ -190,7 +190,7 @@ pub async fn debug_websocket_handler_with_key(
#[debug_handler]
pub async fn fastest_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
@ -214,7 +214,7 @@ pub async fn fastest_websocket_handler_with_key(
#[debug_handler]
pub async fn versus_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
@ -238,7 +238,7 @@ pub async fn versus_websocket_handler_with_key(
#[allow(clippy::too_many_arguments)]
async fn _websocket_handler_with_key(
proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>,
app: Arc<App>,
ip: &IpAddr,
rpc_key: String,
origin: Option<&Origin>,
@ -248,7 +248,7 @@ async fn _websocket_handler_with_key(
) -> Web3ProxyResponse {
let rpc_key = rpc_key.parse()?;
let (authorization, _semaphore) =
let authorization =
key_is_authorized(&app, &rpc_key, ip, origin, proxy_mode, referer, user_agent).await?;
trace!("websocket_handler_with_key {:?}", authorization);
@ -295,11 +295,7 @@ async fn _websocket_handler_with_key(
}
}
async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
socket: WebSocket,
) {
async fn proxy_web3_socket(app: Arc<App>, authorization: Arc<Authorization>, socket: WebSocket) {
// split the websocket so we can read and write concurrently
let (ws_tx, ws_rx) = socket.split();
@ -314,7 +310,7 @@ async fn proxy_web3_socket(
}
async fn websocket_proxy_web3_rpc(
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
authorization: Arc<Authorization>,
json_request: SingleRequest,
response_sender: &mpsc::Sender<Message>,
@ -323,9 +319,16 @@ async fn websocket_proxy_web3_rpc(
) -> Web3ProxyResult<jsonrpc::Response> {
match &json_request.method[..] {
"eth_subscribe" => {
let web3_request =
Web3Request::new_with_app(app, authorization, None, json_request.into(), None)
.await?;
// TODO: this needs a permit
let web3_request = ValidatedRequest::new_with_app(
app,
authorization,
None,
None,
json_request.into(),
None,
)
.await?;
// TODO: how can we subscribe with proxy_mode?
match app
@ -350,9 +353,16 @@ async fn websocket_proxy_web3_rpc(
}
}
"eth_unsubscribe" => {
let web3_request =
Web3Request::new_with_app(app, authorization, None, json_request.into(), None)
.await?;
// todo!(this needs a permit)
let web3_request = ValidatedRequest::new_with_app(
app,
authorization,
None,
None,
json_request.into(),
None,
)
.await?;
// sometimes we get a list, sometimes we get the id directly
// check for the list first, then just use the whole thing
@ -403,7 +413,7 @@ async fn websocket_proxy_web3_rpc(
/// websockets support a few more methods than http clients
async fn handle_socket_payload(
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &mpsc::Sender<Message>,
@ -448,7 +458,7 @@ async fn handle_socket_payload(
}
async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
app: Arc<App>,
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: mpsc::Sender<Message>,

View File

@ -5,7 +5,7 @@
use super::{ResponseCache, ResponseCacheKey};
use crate::{
app::{Web3ProxyApp, APP_USER_AGENT},
app::{App, APP_USER_AGENT},
errors::Web3ProxyError,
};
use axum::{
@ -37,7 +37,7 @@ static CONTENT_TYPE_PLAIN: &str = "text/plain";
#[debug_handler]
pub async fn debug_request(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
ip: InsecureClientIp,
headers: HeaderMap,
) -> impl IntoResponse {
@ -76,7 +76,7 @@ pub async fn debug_request(
/// Health check page for load balancers to use.
#[debug_handler]
pub async fn health(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
@ -96,7 +96,7 @@ pub async fn health(
// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
async fn _health(app: Arc<App>) -> (StatusCode, &'static str, Bytes) {
trace!("health is not cached");
if app.balanced_rpcs.synced() {
@ -113,7 +113,7 @@ async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
/// Easy alerting if backup servers are in use.
#[debug_handler]
pub async fn backups_needed(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
@ -134,7 +134,7 @@ pub async fn backups_needed(
}
#[inline]
async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
async fn _backups_needed(app: Arc<App>) -> (StatusCode, &'static str, Bytes) {
trace!("backups_needed is not cached");
let code = {
@ -164,7 +164,7 @@ async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, B
/// TODO: replace this with proper stats and monitoring. frontend uses it for their public dashboards though
#[debug_handler]
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
@ -184,7 +184,7 @@ pub async fn status(
// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
async fn _status(app: Arc<App>) -> (StatusCode, &'static str, Bytes) {
trace!("status is not cached");
// TODO: get out of app.balanced_rpcs instead?

View File

@ -1,5 +1,5 @@
//! Handle registration, logins, and managing account data.
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::frontend::authorization::login_is_authorized;
use crate::globals::{global_db_conn, global_db_replica_conn};
@ -76,7 +76,7 @@ pub struct LoginPostResponse {
/// We can prompt for an email and and payment after they log in.
#[debug_handler]
pub async fn user_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
// TODO: what does axum's error handling look like if the path fails to parse?
Path(mut params): Path<HashMap<String, String>>,
@ -224,7 +224,7 @@ pub async fn register_new_user(
/// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile.
#[debug_handler]
pub async fn user_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
Query(query): Query<PostLoginQuery>,
Json(payload): Json<PostLogin>,

View File

@ -9,7 +9,7 @@ pub mod subuser;
#[cfg(feature = "stripe")]
pub mod payment_stripe;
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::global_db_transaction;
use axum::{
@ -32,7 +32,7 @@ use std::sync::Arc;
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer_token).await?;
@ -51,7 +51,7 @@ pub struct UserPost {
/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserPost>,
) -> Web3ProxyResponse {

View File

@ -1,4 +1,4 @@
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::balance::Balance;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult};
use crate::frontend::authorization::login_is_authorized;
@ -41,7 +41,7 @@ use tracing::{debug, error, info, trace, warn};
/// - show deposits history (currency, amounts, transaction id)
#[debug_handler]
pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -61,7 +61,7 @@ pub async fn user_balance_get(
/// - shows a list of all deposits, including their chain-id, amount and tx-hash
#[debug_handler]
pub async fn user_chain_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -100,7 +100,7 @@ pub async fn user_chain_deposits_get(
/// - shows a list of all deposits done through stripe
#[debug_handler]
pub async fn user_stripe_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -143,7 +143,7 @@ pub async fn user_stripe_deposits_get(
/// - shows a list of all deposits done by admins
#[debug_handler]
pub async fn user_admin_deposits_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -181,7 +181,7 @@ pub async fn user_admin_deposits_get(
/// `POST /user/balance/:tx_hash` -- Process a confirmed txid to update a user's balance.
#[debug_handler]
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
ip: Option<InsecureClientIp>,
Path(mut params): Path<HashMap<String, String>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -441,7 +441,7 @@ pub async fn user_balance_post(
/// `POST /user/balance_uncle/:uncle_hash` -- Process an uncle block to potentially update a user's balance.
#[debug_handler]
pub async fn user_balance_uncle_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
ip: Option<InsecureClientIp>,
Path(mut params): Path<HashMap<String, String>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -477,7 +477,7 @@ pub async fn user_balance_uncle_post(
}
pub async fn handle_uncle_block(
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
uncle_hash: H256,
) -> Web3ProxyResult<Option<HashMap<u64, Decimal>>> {
// cancel if uncle_hash is actually a confirmed block

View File

@ -2,7 +2,7 @@
use crate::errors::Web3ProxyResponse;
use crate::globals::global_db_conn;
use crate::referral_code::ReferralCode;
use crate::{app::Web3ProxyApp, globals::global_db_replica_conn};
use crate::{app::App, globals::global_db_replica_conn};
use anyhow::Context;
use axum::{
extract::Query,
@ -29,7 +29,7 @@ use std::sync::Arc;
/// This is the link that the user can share to third parties, and get credits.
#[debug_handler]
pub async fn user_referral_link_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -74,7 +74,7 @@ pub async fn user_referral_link_get(
#[debug_handler]
pub async fn user_used_referral_stats(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -132,7 +132,7 @@ pub async fn user_used_referral_stats(
#[debug_handler]
pub async fn user_shared_referral_stats(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {

View File

@ -1,5 +1,5 @@
//! Handle registration, logins, and managing account data.
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::secrets::RpcSecretKey;
@ -27,7 +27,7 @@ use std::sync::Arc;
/// `GET /user/keys` -- Use a bearer token to get the user's api keys and their settings.
#[debug_handler]
pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -123,7 +123,7 @@ pub async fn rpc_keys_get(
/// `DELETE /user/keys` -- Use a bearer token to delete an existing key.
#[debug_handler]
pub async fn rpc_keys_delete(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let _user = app.bearer_is_authorized(bearer).await?;
@ -153,7 +153,7 @@ pub struct UserKeyManagement {
/// `POST /user/keys` or `PUT /user/keys` -- Use a bearer token to create or update an existing key.
#[debug_handler]
pub async fn rpc_keys_management(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Json(payload): Json<UserKeyManagement>,
) -> Web3ProxyResponse {

View File

@ -1,5 +1,5 @@
//! Handle registration, logins, and managing account data.
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::global_db_replica_conn;
use crate::http_params::{
@ -28,7 +28,7 @@ use tracing::info;
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.
#[debug_handler]
pub async fn user_revert_logs_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -125,7 +125,7 @@ pub async fn user_revert_logs_get(
/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested.
#[debug_handler]
pub async fn user_influx_stats_aggregated_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -137,7 +137,7 @@ pub async fn user_influx_stats_aggregated_get(
/// `GET /user/stats/accounting` -- Use a bearer token to get the user's revert logs.
#[debug_handler]
pub async fn user_mysql_stats_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
) -> Web3ProxyResponse {
let user = app.bearer_is_authorized(bearer).await?;
@ -171,7 +171,7 @@ pub async fn user_mysql_stats_get(
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
pub async fn user_influx_stats_detailed_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Query(params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {

View File

@ -1,5 +1,5 @@
//! Handle subusers, viewing subusers, and viewing accessible rpc-keys
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::secrets::RpcSecretKey;
@ -29,7 +29,7 @@ use tracing::trace;
use ulid::{self, Ulid};
pub async fn get_keys_as_subuser(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(_params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -94,7 +94,7 @@ pub async fn get_keys_as_subuser(
}
pub async fn get_subusers(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(mut params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {
@ -166,7 +166,7 @@ pub async fn get_subusers(
#[debug_handler]
pub async fn modify_subuser(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(app): Extension<Arc<App>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
Query(mut params): Query<HashMap<String, String>>,
) -> Web3ProxyResponse {

View File

@ -1,6 +1,6 @@
// TODO: think a lot more about this
use crate::{app::Web3ProxyApp, errors::Web3ProxyError, relational_db::DatabaseReplica};
use crate::{app::App, errors::Web3ProxyError, relational_db::DatabaseReplica};
use derivative::Derivative;
use migration::{
sea_orm::{DatabaseConnection, DatabaseTransaction, TransactionTrait},
@ -9,7 +9,7 @@ use migration::{
use parking_lot::RwLock;
use std::sync::{Arc, LazyLock, OnceLock};
pub static APP: OnceLock<Arc<Web3ProxyApp>> = OnceLock::new();
pub static APP: OnceLock<Arc<App>> = OnceLock::new();
pub static DB_CONN: LazyLock<RwLock<Result<DatabaseConnection, DatabaseError>>> =
LazyLock::new(|| RwLock::new(Err(DatabaseError::NotConfigured)));

View File

@ -1,6 +1,6 @@
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::relational_db::{DatabaseConnection, DatabaseReplica};
use crate::{app::Web3ProxyApp, user_token::UserBearerToken};
use crate::{app::App, user_token::UserBearerToken};
use anyhow::Context;
use axum::{
headers::{authorization::Bearer, Authorization},
@ -137,7 +137,7 @@ pub fn get_rpc_key_id_from_params(
}
pub fn get_chain_id_from_params(
app: &Web3ProxyApp,
app: &App,
params: &HashMap<String, String>,
) -> anyhow::Result<u64> {
params.get("chain_id").map_or_else(

View File

@ -12,6 +12,7 @@ pub use self::request::{JsonRpcRequestEnum, SingleRequest};
pub use self::response::{
ParsedResponse, Response, ResponsePayload, SingleResponse, StreamResponse,
};
pub use request_builder::ValidatedRequest;
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;

View File

@ -1,39 +1,43 @@
use super::LooseId;
use crate::app::App;
use crate::errors::Web3ProxyError;
use crate::frontend::authorization::{Authorization, RequestOrMethod};
use crate::jsonrpc::ValidatedRequest;
use axum::response::Response as AxumResponse;
use derive_more::From;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_inline_default::serde_inline_default;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::time::sleep;
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyError;
use crate::frontend::authorization::{Authorization, RequestOrMethod, Web3Request};
use super::LooseId;
// TODO: &str here instead of String should save a lot of allocations
// TODO: generic type for params?
#[serde_inline_default]
#[derive(Clone, Deserialize, Serialize)]
pub struct SingleRequest {
pub jsonrpc: String,
pub jsonrpc: Cow<'static, str>,
/// id could be a stricter type, but many rpcs do things against the spec
/// TODO: this gets cloned into the response object often. would an Arc be better? That has its own overhead and these are short strings
pub id: Box<RawValue>,
pub method: String,
pub method: Cow<'static, str>,
#[serde_inline_default(serde_json::Value::Null)]
pub params: serde_json::Value,
}
impl SingleRequest {
// TODO: Web3ProxyResult? can this even fail?
pub fn new(id: LooseId, method: String, params: serde_json::Value) -> anyhow::Result<Self> {
pub fn new(
id: LooseId,
method: Cow<'static, str>,
params: serde_json::Value,
) -> anyhow::Result<Self> {
let x = Self {
jsonrpc: "2.0".to_string(),
jsonrpc: "2.0".into(),
id: id.to_raw_value(),
method,
params,
@ -102,7 +106,7 @@ impl JsonRpcRequestEnum {
/// returns the id of the first invalid result (if any). None is good
pub async fn tarpit_invalid(
&self,
app: &Arc<Web3ProxyApp>,
app: &Arc<App>,
authorization: &Arc<Authorization>,
duration: Duration,
) -> Result<(), AxumResponse> {
@ -117,10 +121,12 @@ impl JsonRpcRequestEnum {
// TODO: create a stat so we can penalize
// TODO: what request size
let request = Web3Request::new_with_app(
// TODO: this probably needs a permit
let request = ValidatedRequest::new_with_app(
app,
authorization.clone(),
None,
None,
RequestOrMethod::Method("invalid_method".into(), size),
None,
)
@ -225,7 +231,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
// some providers don't follow the spec and dont include the jsonrpc key
// i think "2.0" should be a fine default to handle these incompatible clones
let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".to_string());
let jsonrpc = jsonrpc.unwrap_or_else(|| "2.0".into());
// TODO: Errors returned by the try operator get shown in an ugly way
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;

View File

@ -0,0 +1,658 @@
use crate::{
app::App,
block_number::CacheMode,
errors::Web3ProxyResult,
frontend::{
authorization::{key_is_authorized, Authorization, RequestOrMethod, ResponseOrBytes},
rpc_proxy_ws::ProxyMode,
},
globals::APP,
jsonrpc,
kafka::KafkaDebugLogger,
response_cache::JsonRpcQueryCacheKey,
rpcs::{blockchain::Web3ProxyBlock, one::Web3Rpc},
secrets::RpcSecretKey,
stats::{AppStat, BackendRequests},
};
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use chrono::Utc;
use derivative::Derivative;
use ethers::types::U64;
use rust_decimal::Decimal;
use serde::{ser::SerializeStruct, Serialize};
use serde_json::{json, value::RawValue};
use std::{borrow::Cow, sync::Arc};
use std::{
fmt::{self, Display},
net::IpAddr,
};
use std::{
mem,
sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64},
time::Duration,
};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
time::Instant,
};
use tracing::{error, trace, warn};
use ulid::Ulid;
use super::{JsonRpcParams, LooseId, SingleRequest};
#[derive(Derivative)]
#[derivative(Default)]
pub struct RequestBuilder {
app: Option<Arc<App>>,
archive_request: bool,
head_block: Option<Web3ProxyBlock>,
authorization: Option<Arc<Authorization>>,
request_or_method: RequestOrMethod,
}
impl RequestBuilder {
pub fn new(app: Arc<App>) -> Self {
let head_block = app.head_block_receiver().borrow().clone();
Self {
app: Some(app),
head_block,
..Default::default()
}
}
pub fn authorize_internal(self) -> Web3ProxyResult<Self> {
// TODO: allow passing proxy_mode to internal?
let authorization = Authorization::internal()?;
Ok(Self {
authorization: Some(Arc::new(authorization)),
..self
})
}
pub async fn authorize_premium(
self,
ip: &IpAddr,
rpc_key: &RpcSecretKey,
origin: Option<&Origin>,
user_agent: Option<&UserAgent>,
proxy_mode: ProxyMode,
referer: Option<&Referer>,
) -> Web3ProxyResult<Self> {
let app = self
.app
.as_ref()
.context("app is required for public requests")?;
// TODO: we can check authorization now, but the semaphore needs to wait!
let authorization =
key_is_authorized(app, rpc_key, ip, origin, proxy_mode, referer, user_agent).await?;
Ok(Self {
authorization: Some(Arc::new(authorization)),
..self
})
}
/// TODO: this takes a lot more things
pub fn authorize_public(
self,
ip: &IpAddr,
origin: Option<&Origin>,
user_agent: Option<&UserAgent>,
proxy_mode: ProxyMode,
referer: Option<&Referer>,
) -> Web3ProxyResult<Self> {
let app = self
.app
.as_ref()
.context("app is required for public requests")?;
let authorization = Authorization::external(
&app.config.allowed_origin_requests_per_period,
ip,
origin,
proxy_mode,
referer,
user_agent,
)?;
let head_block = app.watch_consensus_head_receiver.borrow().clone();
Ok(Self {
authorization: Some(Arc::new(authorization)),
head_block,
..self
})
}
pub fn set_archive_request(self, archive_request: bool) -> Self {
Self {
archive_request,
..self
}
}
/// replace 'latest' in the json and figure out the minimum and maximum blocks.
/// also tarpit invalid methods.
pub async fn set_request(self, request: SingleRequest) -> Web3ProxyResult<Self> {
Ok(Self {
request_or_method: RequestOrMethod::Request(request),
..self
})
}
pub fn set_method(self, method: Cow<'static, str>, size: usize) -> Self {
Self {
request_or_method: RequestOrMethod::Method(method, size),
..self
}
}
pub async fn build(
&self,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Arc<ValidatedRequest>> {
// TODO: make this work without app being set?
let app = self.app.as_ref().context("app is required")?;
let authorization = self
.authorization
.clone()
.context("authorization is required")?;
let permit = authorization.permit(app).await?;
let x = ValidatedRequest::new_with_app(
app,
authorization,
max_wait,
permit,
self.request_or_method.clone(),
self.head_block.clone(),
)
.await;
if let Ok(x) = &x {
if self.archive_request {
x.archive_request.store(true, atomic::Ordering::Relaxed);
}
}
// todo!(anything else to set?)
x
}
}
/// TODO:
/// TODO: instead of a bunch of atomics, this should probably use a RwLock. need to think more about how parallel requests are going to work though
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct ValidatedRequest {
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: AtomicBool,
pub authorization: Arc<Authorization>,
pub cache_mode: CacheMode,
/// TODO: this should probably be in a global config. although maybe if we run multiple chains in one process this will be useful
pub chain_id: u64,
pub head_block: Option<Web3ProxyBlock>,
/// TODO: this should be in a global config. not copied to every single request
pub usd_per_cu: Decimal,
pub inner: RequestOrMethod,
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
#[derivative(Default(value = "Instant::now()"))]
pub start_instant: Instant,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
/// The number of times the request got stuck waiting because no servers were synced
pub no_servers: AtomicU64,
/// If handling the request hit an application error
/// This does not count things like a transcation reverting or a malformed request
pub error_response: AtomicBool,
/// Size in bytes of the JSON response. Does not include headers or things like that.
pub response_bytes: AtomicU64,
/// How many milliseconds it took to respond to the request
pub response_millis: AtomicU64,
/// What time the (first) response was proxied.
/// TODO: think about how to store response times for ProxyMode::Versus
pub response_timestamp: AtomicI64,
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub response_from_backup_rpc: AtomicBool,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: AtomicBool,
/// ProxyMode::Debug logs requests and responses with Kafka
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
/// How long to spend waiting for an rpc that can serve this request
pub connect_timeout: Duration,
/// How long to spend waiting for an rpc to respond to this request
/// TODO: this should start once the connection is established
pub expire_timeout: Duration,
/// limit the number of concurrent requests from a given user.
pub permit: Option<OwnedSemaphorePermit>,
}
impl Display for ValidatedRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}({})",
self.inner.method(),
serde_json::to_string(self.inner.params()).expect("this should always serialize")
)
}
}
impl Serialize for ValidatedRequest {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("request", 7)?;
state.serialize_field(
"archive_request",
&self.archive_request.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field("chain_id", &self.chain_id)?;
state.serialize_field("head_block", &self.head_block)?;
state.serialize_field("request", &self.inner)?;
state.serialize_field("elapsed", &self.start_instant.elapsed().as_secs_f32())?;
{
let backend_names = self.backend_requests.lock();
let backend_names = backend_names
.iter()
.map(|x| x.name.as_str())
.collect::<Vec<_>>();
state.serialize_field("backend_requests", &backend_names)?;
}
state.serialize_field(
"response_bytes",
&self.response_bytes.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
}
/// TODO: move all of this onto RequestBuilder
impl ValidatedRequest {
#[allow(clippy::too_many_arguments)]
async fn new_with_options(
app: Option<&App>,
authorization: Arc<Authorization>,
chain_id: u64,
head_block: Option<Web3ProxyBlock>,
kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
max_wait: Option<Duration>,
permit: Option<OwnedSemaphorePermit>,
mut request: RequestOrMethod,
usd_per_cu: Decimal,
) -> Web3ProxyResult<Arc<Self>> {
let start_instant = Instant::now();
let stat_sender = app.and_then(|x| x.stat_sender.clone());
// let request: RequestOrMethod = request.into();
// we VERY INTENTIONALLY log to kafka BEFORE calculating the cache key
// this is because calculating the cache_key may modify the params!
// for example, if the request specifies "latest" as the block number, we replace it with the actual latest block number
if let Some(ref kafka_debug_logger) = kafka_debug_logger {
// TODO: channels might be more ergonomic than spawned futures
// spawned things run in parallel easier but generally need more Arcs
kafka_debug_logger.log_debug_request(&request);
}
// now that kafka has logged the user's original params, we can calculate the cache key
// TODO: modify CacheMode::new to wait for a future block if one is requested! be sure to update head_block too!
let cache_mode = match &mut request {
RequestOrMethod::Request(x) => CacheMode::new(x, head_block.as_ref(), app).await,
_ => CacheMode::Never,
};
let connect_timeout = Duration::from_secs(3);
let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295));
let x = Self {
archive_request: false.into(),
authorization,
backend_requests: Default::default(),
cache_mode,
chain_id,
error_response: false.into(),
connect_timeout,
expire_timeout,
head_block: head_block.clone(),
kafka_debug_logger,
no_servers: 0.into(),
inner: request,
permit,
response_bytes: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
response_timestamp: 0.into(),
start_instant,
stat_sender,
usd_per_cu,
user_error_response: false.into(),
};
Ok(Arc::new(x))
}
/// todo!(this shouldn't be public. use the RequestBuilder)
pub async fn new_with_app(
app: &App,
authorization: Arc<Authorization>,
max_wait: Option<Duration>,
permit: Option<OwnedSemaphorePermit>,
request: RequestOrMethod,
head_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<Arc<Self>> {
// TODO: get this out of tracing instead (where we have a String from Amazon's LB)
let request_ulid = Ulid::new();
let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
KafkaDebugLogger::try_new(
app,
authorization.clone(),
head_block.as_ref().map(|x| x.number()),
"web3_proxy:rpc",
request_ulid,
)
} else {
None
};
let chain_id = app.config.chain_id;
let usd_per_cu = app.config.usd_per_cu.unwrap_or_default();
Self::new_with_options(
Some(app),
authorization,
chain_id,
head_block,
kafka_debug_logger,
max_wait,
permit,
request,
usd_per_cu,
)
.await
}
pub async fn new_internal<P: JsonRpcParams>(
method: Cow<'static, str>,
params: &P,
head_block: Option<Web3ProxyBlock>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<Arc<Self>> {
let authorization = Arc::new(Authorization::internal().unwrap());
// todo!(we need a real id! increment a counter on the app or websocket-only providers are going to have a problem)
let id = LooseId::Number(1);
// TODO: this seems inefficient
let request = SingleRequest::new(id, method, json!(params)).unwrap();
if let Some(app) = APP.get() {
Self::new_with_app(
app,
authorization,
max_wait,
None,
request.into(),
head_block,
)
.await
} else {
Self::new_with_options(
None,
authorization,
0,
head_block,
None,
max_wait,
None,
request.into(),
Default::default(),
)
.await
}
}
#[inline]
pub fn backend_rpcs_used(&self) -> Vec<Arc<Web3Rpc>> {
self.backend_requests.lock().clone()
}
pub fn cache_key(&self) -> Option<u64> {
match &self.cache_mode {
CacheMode::Never => None,
x => {
let x = JsonRpcQueryCacheKey::new(x, &self.inner).hash();
Some(x)
}
}
}
#[inline]
pub fn cache_jsonrpc_errors(&self) -> bool {
self.cache_mode.cache_jsonrpc_errors()
}
#[inline]
pub fn id(&self) -> Box<RawValue> {
self.inner.id()
}
#[inline]
pub fn max_block_needed(&self) -> Option<U64> {
self.cache_mode.to_block().map(|x| *x.num())
}
pub fn min_block_needed(&self) -> Option<U64> {
if self.archive_request.load(atomic::Ordering::Relaxed) {
Some(U64::zero())
} else {
self.cache_mode.from_block().map(|x| *x.num())
}
}
#[inline]
pub fn connect_timeout_at(&self) -> Instant {
// TODO: get from config
self.start_instant + Duration::from_secs(3)
}
#[inline]
pub fn connect_timeout(&self) -> bool {
self.connect_timeout_at() <= Instant::now()
}
#[inline]
pub fn expire_at(&self) -> Instant {
// TODO: get from config
// erigon's timeout is 5 minutes so we want it shorter than that
self.start_instant + Duration::from_secs(295)
}
#[inline]
pub fn expired(&self) -> bool {
self.expire_at() <= Instant::now()
}
pub fn try_send_stat(mut self) -> Web3ProxyResult<()> {
if let Some(stat_sender) = self.stat_sender.take() {
trace!(?self, "sending stat");
let stat: AppStat = self.into();
if let Err(err) = stat_sender.send(stat) {
error!(?err, "failed sending stat");
// TODO: return it? that seems like it might cause an infinite loop
// TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge
};
trace!("stat sent successfully");
}
Ok(())
}
pub fn add_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
// TODO: fetch? set? should it be None in a Mutex? or a OnceCell?
let response = response.into();
let num_bytes = response.num_bytes() as u64;
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::Relaxed);
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::Relaxed,
);
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::Relaxed);
// TODO: set user_error_response and error_response here instead of outside this function
if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() {
if let ResponseOrBytes::Response(response) = response {
match response {
jsonrpc::SingleResponse::Parsed(response) => {
kafka_debug_logger.log_debug_response(response);
}
jsonrpc::SingleResponse::Stream(_) => {
warn!("need to handle streaming response debug logging");
}
}
}
}
}
pub fn try_send_arc_stat(self: Arc<Self>) -> Web3ProxyResult<()> {
match Arc::into_inner(self) {
Some(x) => x.try_send_stat(),
None => {
trace!("could not send stat while other arcs are active");
Ok(())
}
}
}
#[inline]
pub fn proxy_mode(&self) -> ProxyMode {
self.authorization.checks.proxy_mode
}
// TODO: helper function to duplicate? needs to clear request_bytes, and all the atomics tho...
}
impl Drop for ValidatedRequest {
fn drop(&mut self) {
if self.stat_sender.is_some() {
// turn `&mut self` into `self`
let x = mem::take(self);
trace!(?x, "request metadata dropped without stat send");
let _ = x.try_send_stat();
}
}
}
/*
pub fn foo(&self) {
let payload = payload
.map_err(|e| Web3ProxyError::from(e).into_response_with_id(None))?
.0;
let first_id = payload.first_id();
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode)
.await
.map_err(|e| e.into_response_with_id(first_id.clone()))?;
let authorization = Arc::new(authorization);
payload
.tarpit_invalid(&app, &authorization, Duration::from_secs(5))
.await?;
// TODO: calculate payload bytes here (before turning into serde_json::Value). that will save serializing later
// TODO: is first_id the right thing to attach to this error?
let (status_code, response, rpcs) = app
.proxy_web3_rpc(authorization, payload)
.await
.map_err(|e| e.into_response_with_id(first_id))?;
let mut response = (status_code, response).into_response();
// TODO: DRY this up. it is the same code for public and private queries
let response_headers = response.headers_mut();
// TODO: this might be slow. think about this more
// TODO: special string if no rpcs were used (cache hit)?
let mut backup_used = false;
let rpcs: String = rpcs
.into_iter()
.map(|x| {
if x.backup {
backup_used = true;
}
x.name.clone()
})
.join(",");
response_headers.insert(
"X-W3P-BACKEND-RPCS",
rpcs.parse().expect("W3P-BACKEND-RPCS should always parse"),
);
response_headers.insert(
"X-W3P-BACKUP-RPC",
backup_used
.to_string()
.parse()
.expect("W3P-BACKEND-RPCS should always parse"),
);
Ok(response)
}
*/

View File

@ -1,3 +1,7 @@
use super::JsonRpcErrorData;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::jsonrpc::ValidatedRequest;
use crate::response_cache::ForwardedResponse;
use axum::body::StreamBody;
use axum::response::IntoResponse;
use axum::Json;
@ -10,12 +14,6 @@ use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::Web3Request;
use crate::response_cache::ForwardedResponse;
use super::JsonRpcErrorData;
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
@ -207,7 +205,7 @@ pub struct StreamResponse<T> {
_t: PhantomData<T>,
buffer: Bytes,
response: reqwest::Response,
web3_request: Arc<Web3Request>,
web3_request: Arc<ValidatedRequest>,
}
impl<T> StreamResponse<T> {
@ -257,7 +255,7 @@ where
pub async fn read_if_short(
mut response: reqwest::Response,
nbytes: u64,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<SingleResponse<T>> {
Ok(Self::from_bytes(response.bytes().await?)?)
/*

View File

@ -1,4 +1,4 @@
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::frontend::authorization::{Authorization, RequestOrMethod};
use core::fmt;
use ethers::types::U64;
@ -33,7 +33,7 @@ type KafkaLogResult = Result<(i32, i64), (rdkafka::error::KafkaError, OwnedMessa
impl KafkaDebugLogger {
pub fn try_new(
app: &Web3ProxyApp,
app: &App,
authorization: Arc<Authorization>,
head_block_num: Option<U64>,
kafka_topic: &str,

View File

@ -8,12 +8,12 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::Web3ProxyResult;
/// Run a prometheus metrics server on the given port.
pub async fn serve(
app: Arc<Web3ProxyApp>,
app: Arc<App>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> Web3ProxyResult<()> {
// routes should be ordered most to least common
@ -44,7 +44,7 @@ pub async fn serve(
.map_err(Into::into)
}
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
async fn root(Extension(app): Extension<Arc<App>>) -> Response {
let serialized = app.prometheus_metrics().await;
let mut r = serialized.into_response();

View File

@ -3,7 +3,7 @@ use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::request::OpenRequestHandle;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Web3Request;
use crate::jsonrpc::ValidatedRequest;
use crate::rpcs::request::OpenRequestResult;
use async_stream::stream;
use base64::engine::general_purpose;
@ -17,7 +17,7 @@ use hdrhistogram::Histogram;
use itertools::{Itertools, MinMaxResult};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{min_by_key, Ordering, Reverse};
use std::cmp::{Ordering, Reverse};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::select;
@ -97,7 +97,7 @@ pub struct RankedRpcs {
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
request: Arc<Web3Request>,
request: Arc<ValidatedRequest>,
}
impl RankedRpcs {
@ -198,7 +198,7 @@ impl RankedRpcs {
None
}
pub fn for_request(&self, web3_request: &Arc<Web3Request>) -> Option<RpcsForRequest> {
pub fn for_request(&self, web3_request: &Arc<ValidatedRequest>) -> Option<RpcsForRequest> {
if self.num_active_rpcs() == 0 {
return None;
}
@ -846,6 +846,7 @@ impl ConsensusFinder {
}
}
/*
fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web3Rpc> {
let now = Instant::now();
@ -857,6 +858,7 @@ fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web
faster
}
*/
impl RpcsForRequest {
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
@ -916,8 +918,6 @@ impl RpcsForRequest {
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited
warn!(?earliest_retry_at, num_waits=%wait_for_sync.len(), "no rpcs ready");
break;
let min_wait_until = Instant::now() + Duration::from_millis(10);
// clear earliest_retry_at if it is too far in the future to help us

View File

@ -2,19 +2,19 @@
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{RankedRpcs, RpcsForRequest};
use super::one::Web3Rpc;
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
use crate::app::{flatten_handle, App, Web3ProxyJoinHandle};
use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::Web3Request;
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::ValidatedRequest;
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use deduped_broadcast::DedupedBroadcaster;
use derive_more::From;
use ethers::prelude::{TxHash, U64};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::stream::StreamExt;
use futures_util::future::join_all;
use hashbrown::HashMap;
use http::StatusCode;
use moka::future::CacheBuilder;
@ -167,7 +167,7 @@ impl Web3Rpcs {
/// update the rpcs in this group
pub async fn apply_server_configs(
&self,
app: &Web3ProxyApp,
app: &App,
rpc_configs: &HashMap<String, Web3RpcConfig>,
) -> Web3ProxyResult<()> {
// safety checks
@ -203,7 +203,7 @@ impl Web3Rpcs {
let server_id = app.config.unique_id;
// turn configs into connections (in parallel)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs
let spawn_handles: Vec<_> = rpc_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
if server_config.disabled {
@ -226,7 +226,7 @@ impl Web3Rpcs {
names_to_keep.push(server_name.clone());
let handle = tokio::spawn(server_config.clone().spawn(
let handle = server_config.clone().spawn(
server_name.clone(),
vredis_pool,
server_id,
@ -237,15 +237,15 @@ impl Web3Rpcs {
block_and_rpc_sender,
self.pending_txid_firehose.clone(),
self.max_head_block_age,
));
);
Some(handle)
})
.collect();
while let Some(x) = spawn_handles.next().await {
for x in join_all(spawn_handles).await {
match x {
Ok(Ok((new_rpc, _handle))) => {
Ok((new_rpc, _handle)) => {
// web3 connection worked
let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone);
@ -287,16 +287,12 @@ impl Web3Rpcs {
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
}
}
Ok(Err(err)) => {
Err(err) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
// TODO: retry automatically
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into());
}
}
}
@ -393,10 +389,10 @@ impl Web3Rpcs {
Ok(())
}
/// TODO: i think this RpcsForRequest should be stored on the Web3Request when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app
/// TODO: i think this RpcsForRequest should be stored on the ValidatedRequest when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app
pub async fn wait_for_rpcs_for_request(
&self,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<RpcsForRequest> {
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
@ -444,7 +440,7 @@ impl Web3Rpcs {
/// TODO: should this wait for ranked rpcs? maybe only a fraction of web3_request's time?
pub async fn try_rpcs_for_request(
&self,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<RpcsForRequest> {
// TODO: by_name might include things that are on a forked
let ranked_rpcs: Arc<RankedRpcs> =
@ -475,14 +471,14 @@ impl Web3Rpcs {
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
method: Cow<'static, str>,
params: &P,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let head_block = self.head_block();
let web3_request =
Web3Request::new_internal(method.into(), params, head_block, max_wait).await?;
ValidatedRequest::new_internal(method, params, head_block, max_wait).await?;
let response = self.request_with_metadata(&web3_request).await?;
@ -503,7 +499,7 @@ impl Web3Rpcs {
/// TODO: should max_tries be on web3_request. maybe as tries_left?
pub async fn request_with_metadata<R: JsonRpcResultData>(
&self,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: collect the most common error. Web3ProxyError isn't Hash + Eq though. And making it so would be a pain
let mut errors = vec![];
@ -603,7 +599,7 @@ impl Web3Rpcs {
#[allow(clippy::too_many_arguments)]
pub async fn try_proxy_connection<R: JsonRpcResultData>(
&self,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
let proxy_mode = web3_request.proxy_mode();
@ -911,7 +907,7 @@ mod tests {
// // request that requires the head block
// // best_synced_backend_connection which servers to be synced with the head block should not find any nodes
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(head_block.number.unwrap(), false),
// Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
@ -1007,7 +1003,7 @@ mod tests {
// assert!(!lagged_rpc.has_block_data(head_block.number.unwrap()));
// // request on the lagged block should get a handle from either server
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(lagged_block.number.unwrap(), false),
// Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
@ -1022,7 +1018,7 @@ mod tests {
// // request on the head block should get a handle
// // TODO: make sure the handle is for the expected rpc
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(head_block.number.unwrap(), false),
// Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
@ -1039,7 +1035,7 @@ mod tests {
// // TODO: bring this back. it is failing because there is no global APP and so things default to not needing caching. no cache checks means we don't know this is a future block
// // future block should not get a handle
// let future_block_num = head_block.as_ref().number.unwrap() + U64::from(10);
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(future_block_num, false),
// Some(Web3ProxyBlock::try_from(head_block.clone()).unwrap()),
@ -1121,7 +1117,7 @@ mod tests {
// assert_eq!(rpcs.num_synced_rpcs(), 0);
// // best_synced_backend_connection requires servers to be synced with the head block
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &("latest", false),
// Some(head_block.clone()),
@ -1232,7 +1228,7 @@ mod tests {
// // best_synced_backend_connection requires servers to be synced with the head block
// // TODO: test with and without passing the head_block.number?
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(head_block.number(), false),
// Some(head_block.clone()),
@ -1249,7 +1245,7 @@ mod tests {
// OpenRequestResult::Handle(_)
// ));
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(head_block.number(), false),
// Some(head_block.clone()),
@ -1262,8 +1258,8 @@ mod tests {
// // assert_eq!(best_available_server, best_available_server_from_none);
// // TODO: actually test a future block. this Web3Request doesn't require block #1
// let r = Web3Request::new_internal(
// // TODO: actually test a future block. this ValidatedRequest doesn't require block #1
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(head_block.number(), false),
// Some(head_block.clone()),
@ -1415,7 +1411,7 @@ mod tests {
// assert_eq!(rpcs.num_synced_rpcs(), 1);
// // best_synced_backend_connection requires servers to be synced with the head block
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(block_2.number(), false),
// Some(block_2.clone()),
@ -1433,7 +1429,7 @@ mod tests {
// );
// // this should give us both servers
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(block_1.number(), false),
// Some(block_2.clone()),
@ -1468,7 +1464,7 @@ mod tests {
// // this should give us only the archive server
// // TODO: i think this might have problems because block_1 - 100 isn't a real block and so queries for it will fail. then it falls back to caching with the head block
// // TODO: what if we check if its an archive block while doing cache_mode.
// let r = Web3Request::new_internal(
// let r = ValidatedRequest::new_internal(
// "eth_getBlockByNumber".to_string(),
// &(block_archive.number(), false),
// Some(block_2.clone()),

View File

@ -5,7 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Web3Request;
use crate::jsonrpc::ValidatedRequest;
use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
@ -22,6 +22,7 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
use std::borrow::Cow;
use std::cmp::Reverse;
use std::fmt;
use std::hash::{Hash, Hasher};
@ -29,6 +30,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::select;
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::task::yield_now;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, trace, warn, Level};
use url::Url;
@ -364,7 +366,7 @@ impl Web3Rpc {
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let head_block_num = self
.internal_request::<_, U256>(
"eth_blockNumber",
"eth_blockNumber".into(),
&[(); 0],
// error here are expected, so keep the level low
Some(Level::DEBUG.into()),
@ -392,7 +394,7 @@ impl Web3Rpc {
// TODO: what should the request be?
let archive_result: Result<Bytes, _> = self
.internal_request(
"eth_getCode",
"eth_getCode".into(),
&json!((
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
@ -416,6 +418,8 @@ impl Web3Rpc {
}
limit = Some(block_data_limit);
yield_now().await;
}
if let Some(limit) = limit {
@ -491,7 +495,7 @@ impl Web3Rpc {
// trace!("waiting on chain id for {}", self);
let found_chain_id: U64 = self
.internal_request(
"eth_chainId",
"eth_chainId".into(),
&[(); 0],
Some(Level::TRACE.into()),
Some(Duration::from_secs(5)),
@ -617,7 +621,7 @@ impl Web3Rpc {
let to = if let Some(txid) = head_block.transactions().last().cloned() {
let tx = self
.internal_request::<_, Option<Transaction>>(
"eth_getTransactionByHash",
"eth_getTransactionByHash".into(),
&(txid,),
error_handler,
Some(Duration::from_secs(5)),
@ -639,7 +643,7 @@ impl Web3Rpc {
let _code = self
.internal_request::<_, Option<Bytes>>(
"eth_getCode",
"eth_getCode".into(),
&(to, block_number),
error_handler,
Some(Duration::from_secs(5)),
@ -1004,7 +1008,7 @@ impl Web3Rpc {
// TODO: send this request to the ws_provider instead of the http_provider
let latest_block: Result<Option<ArcBlock>, _> = self
.internal_request(
"eth_getBlockByNumber",
"eth_getBlockByNumber".into(),
&("latest", false),
error_handler,
Some(Duration::from_secs(5)),
@ -1039,7 +1043,7 @@ impl Web3Rpc {
loop {
let block_result = self
.internal_request::<_, Option<ArcBlock>>(
"eth_getBlockByNumber",
"eth_getBlockByNumber".into(),
&("latest", false),
error_handler,
Some(Duration::from_secs(5)),
@ -1078,7 +1082,7 @@ impl Web3Rpc {
pub async fn wait_for_request_handle(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
error_handler: Option<RequestErrorHandler>,
allow_unhealthy: bool,
) -> Web3ProxyResult<OpenRequestHandle> {
@ -1211,7 +1215,7 @@ impl Web3Rpc {
pub async fn try_request_handle(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
error_handler: Option<RequestErrorHandler>,
allow_unhealthy: bool,
) -> Web3ProxyResult<OpenRequestResult> {
@ -1303,7 +1307,7 @@ impl Web3Rpc {
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
method: Cow<'static, str>,
params: &P,
error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
@ -1312,7 +1316,7 @@ impl Web3Rpc {
let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone();
let web3_request =
Web3Request::new_internal(method.into(), params, head_block, max_wait).await?;
ValidatedRequest::new_internal(method, params, head_block, max_wait).await?;
// TODO: if we are inside the health checks and we aren't healthy yet. we need some sort of flag to force try_handle to not error
@ -1322,7 +1326,7 @@ impl Web3Rpc {
pub async fn authorized_request<R: JsonRpcResultData>(
self: &Arc<Self>,
web3_request: &Arc<Web3Request>,
web3_request: &Arc<ValidatedRequest>,
error_handler: Option<RequestErrorHandler>,
allow_unhealthy: bool,
) -> Web3ProxyResult<R> {

View File

@ -1,8 +1,10 @@
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType, Web3Request};
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload};
use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload, ValidatedRequest,
};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
@ -37,7 +39,7 @@ pub enum OpenRequestResult {
/// Make RPC requests through this handle and drop it when you are done.
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
pub struct OpenRequestHandle {
web3_request: Arc<Web3Request>,
web3_request: Arc<ValidatedRequest>,
error_handler: RequestErrorHandler,
rpc: Arc<Web3Rpc>,
}
@ -149,7 +151,7 @@ impl Drop for OpenRequestHandle {
impl OpenRequestHandle {
pub async fn new(
web3_request: Arc<Web3Request>,
web3_request: Arc<ValidatedRequest>,
rpc: Arc<Web3Rpc>,
error_handler: Option<RequestErrorHandler>,
) -> Self {

View File

@ -0,0 +1,559 @@
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload, ValidatedRequest,
};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
use ethers::types::{Address, Bytes};
use futures::Future;
use http::StatusCode;
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use nanorand::Rng;
use serde_json::json;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, trace, warn, Level};
#[derive(From)]
pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// The rpc are not synced, but they should be soon.
/// You should wait for the given block number.
/// TODO: should this return an OpenRequestHandle? that might recurse
Lagged(Pin<Box<dyn Future<Output = Web3ProxyResult<Arc<Web3Rpc>>> + Send>>),
/// Unable to start a request because no servers are synced or the necessary data has been pruned
Failed,
}
/// Make RPC requests through this handle and drop it when you are done.
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
pub struct OpenRequestHandle {
web3_request: Arc<ValidatedRequest>,
error_handler: RequestErrorHandler,
rpc: Arc<Web3Rpc>,
}
/// Depending on the context, RPC errors require different handling.
#[derive(Copy, Clone, Debug, Default)]
pub enum RequestErrorHandler {
/// Log at the trace level. Use when errors are expected.
#[default]
TraceLevel,
/// Log at the debug level. Use when errors are expected.
DebugLevel,
/// Log at the info level. Use when errors are expected.
InfoLevel,
/// Log at the error level. Use when errors are bad.
ErrorLevel,
/// Log at the warn level. Use when errors do not cause problems.
WarnLevel,
/// Potentially save the revert. Users can tune how often this happens
Save,
}
// TODO: second param could be skipped since we don't need it here
#[derive(serde::Deserialize, serde::Serialize)]
struct EthCallParams((EthCallFirstParams, Option<serde_json::Value>));
#[derive(serde::Deserialize, serde::Serialize)]
struct EthCallFirstParams {
to: Option<Address>,
data: Option<Bytes>,
}
impl std::fmt::Debug for OpenRequestHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenRequestHandle")
.field("method", &self.web3_request.inner.method())
.field("rpc", &self.rpc.name)
.finish_non_exhaustive()
}
}
impl From<Level> for RequestErrorHandler {
fn from(level: Level) -> Self {
match level {
Level::DEBUG => RequestErrorHandler::DebugLevel,
Level::ERROR => RequestErrorHandler::ErrorLevel,
Level::INFO => RequestErrorHandler::InfoLevel,
Level::TRACE => RequestErrorHandler::TraceLevel,
Level::WARN => RequestErrorHandler::WarnLevel,
}
}
}
impl Authorization {
/// Save a RPC call that return "execution reverted" to the database.
async fn save_revert(
self: Arc<Self>,
method: Method,
params: EthCallFirstParams,
) -> Web3ProxyResult<()> {
let rpc_key_id = match self.checks.rpc_secret_key_id {
Some(rpc_key_id) => rpc_key_id.into(),
None => {
// trace!(?self, "cannot save revert without rpc_key_id");
return Ok(());
}
};
let db_conn = global_db_conn()?;
// TODO: should the database set the timestamp?
// we intentionally use "now" and not the time the request started
// why? because we aggregate stats and setting one in the past could cause confusion
let timestamp = Utc::now();
let to = params.to.unwrap_or_else(Address::zero).as_bytes().to_vec();
let call_data = params.data.map(|x| x.to_string());
let rl = revert_log::ActiveModel {
rpc_key_id: sea_orm::Set(rpc_key_id),
method: sea_orm::Set(method),
to: sea_orm::Set(to),
call_data: sea_orm::Set(call_data),
timestamp: sea_orm::Set(timestamp),
..Default::default()
};
let rl = rl
.save(&db_conn)
.await
.web3_context("Failed saving new revert log")?;
// TODO: what log level and format?
trace!(revert_log=?rl);
// TODO: return something useful
Ok(())
}
}
impl Drop for OpenRequestHandle {
fn drop(&mut self) {
self.rpc
.active_requests
.fetch_sub(1, atomic::Ordering::Relaxed);
}
}
impl OpenRequestHandle {
pub async fn new(
web3_request: Arc<ValidatedRequest>,
rpc: Arc<Web3Rpc>,
error_handler: Option<RequestErrorHandler>,
) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let error_handler = error_handler.unwrap_or_default();
Self {
web3_request,
error_handler,
rpc,
}
}
pub fn connection_name(&self) -> String {
self.rpc.name.clone()
}
#[inline]
pub fn clone_connection(&self) -> Arc<Web3Rpc> {
self.rpc.clone()
}
pub fn rate_limit_for(&self, duration: Duration) {
if self.rpc.backup {
debug!(?duration, "rate limited on {}!", self.rpc);
} else {
warn!(?duration, "rate limited on {}!", self.rpc);
}
// TODO: use send_if_modified to be sure we only send if our value is greater
self.rpc
.hard_limit_until
.as_ref()
.unwrap()
.send_replace(Instant::now() + duration);
}
/// Just get the response from the provider without any extra handling.
/// This lets us use the try operator which makes it much easier to read
async fn _request<R: JsonRpcResultData + serde::Serialize>(
&self,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: replace ethers-rs providers with our own that supports streaming the responses
// TODO: replace ethers-rs providers with our own that handles "id" being null
if let (Some(url), Some(ref client)) = (self.rpc.http_url.clone(), &self.rpc.http_client) {
// prefer the http provider
let request = self
.web3_request
.inner
.jsonrpc_request()
.context("there should always be a request here")?;
let response = client.post(url).json(request).send().await?;
if response.status() == StatusCode::TOO_MANY_REQUESTS {
// TODO: how much should we actually rate limit?
self.rate_limit_for(Duration::from_secs(1));
}
let response = response.error_for_status()?;
jsonrpc::SingleResponse::read_if_short(response, 1024, &self.web3_request).await
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
// use the websocket provider if no http provider is available
let method = self.web3_request.inner.method();
let params = self.web3_request.inner.params();
// some ethers::ProviderError need to be converted to JsonRpcErrorData. the rest to Web3ProxyError
let response = match p.request::<_, R>(method, params).await {
Ok(x) => jsonrpc::ParsedResponse::from_result(x, self.web3_request.id()),
Err(provider_error) => match JsonRpcErrorData::try_from(&provider_error) {
Ok(x) => jsonrpc::ParsedResponse::from_error(x, self.web3_request.id()),
Err(ProviderError::HTTPError(error)) => {
if let Some(status_code) = error.status() {
if status_code == StatusCode::TOO_MANY_REQUESTS {
// TODO: how much should we actually rate limit?
self.rate_limit_for(Duration::from_secs(1));
}
}
return Err(provider_error.into());
}
Err(err) => {
warn!(?err, "error from {}", self.rpc);
return Err(provider_error.into());
}
},
};
Ok(response.into())
} else {
// this must be a test
Err(anyhow::anyhow!("no provider configured!").into())
}
}
pub fn error_handler(&self) -> RequestErrorHandler {
if let RequestErrorHandler::Save = self.error_handler {
let method = self.web3_request.inner.method();
// TODO: should all these be Trace or Debug or a mix?
// TODO: this list should come from config. other methods might be desired
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
} else if DB_CONN.read().is_ok() {
let log_revert_chance = self.web3_request.authorization.checks.log_revert_chance;
if log_revert_chance == 0 {
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else if log_revert_chance == u16::MAX {
// trace!(%method, "gaurenteed chance. SAVING on revert");
self.error_handler
} else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance {
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
self.error_handler
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
}
} else {
self.error_handler
}
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// depending on how things are locked, you might need to pass the provider in
/// we take self to ensure this function only runs once
/// This does some inspection of the response to check for non-standard errors and rate limiting to try to give a Web3ProxyError instead of an Ok
pub async fn request<R: JsonRpcResultData + serde::Serialize>(
self,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
let authorization = &self.web3_request.authorization;
match &authorization.authorization_type {
AuthorizationType::Frontend => {
self.rpc
.external_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
AuthorizationType::Internal => {
self.rpc
.internal_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
// we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions)
// we generally don't want to use the try operator. we might need to log errors
let start = Instant::now();
let mut response = self._request().await;
// measure successes and errors
// originally i thought we wouldn't want errors, but I think it's a more accurate number including all requests
let latency = start.elapsed();
// we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called!
trace!(
"response from {} for {}: {:?}",
self.rpc,
self.web3_request,
response,
);
// TODO: move this to a helper function?
// true if we got a jsonrpc result. a jsonrpc error or other error is false.
// TODO: counters for errors vs jsonrpc vs success?
let response_is_success = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => {
matches!(&x.payload, ResponsePayload::Success { .. })
}
Ok(jsonrpc::SingleResponse::Stream(..)) => true,
Err(_) => false,
};
if response_is_success {
// only track latency for successful requests
tokio::spawn(async move {
self.rpc.peak_latency.as_ref().unwrap().report(latency);
self.rpc.median_latency.as_ref().unwrap().record(latency);
// TODO: app-wide median and peak latency?
});
} else {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
// we do **NOT** use self.error_handler here because it might have been modified
let error_handler = self.error_handler();
enum ResponseType {
Error,
Revert,
RateLimited,
}
let response_type: ResponseType = match &response {
Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload {
ResponsePayload::Success { .. } => unreachable!(),
ResponsePayload::Error { error } => {
trace!(?error, "jsonrpc error data");
if error.message.starts_with("execution reverted") {
ResponseType::Revert
} else if error.code == StatusCode::TOO_MANY_REQUESTS.as_u16() as i64 {
ResponseType::RateLimited
} else {
// TODO! THIS HAS TOO MANY FALSE POSITIVES! Theres another spot in the code that checks for things.
// if error.message.contains("limit") || error.message.contains("request") {
// self.rate_limit_for(Duration::from_secs(1));
// }
match error.code {
-32000 => {
if error.message.contains("MDBX_PANIC:") {
response = Err(Web3ProxyError::MdbxPanic(
self.connection_name(),
error.message.clone(),
));
} else {
// TODO: regex?
let archive_prefixes = [
"header not found",
"header for hash not found",
"missing trie node",
];
for prefix in archive_prefixes {
if error.message.starts_with(prefix) {
// TODO: what error?
response = Err(Web3ProxyError::ArchiveRequired {
min: self.web3_request.min_block_needed(),
max: self.web3_request.max_block_needed(),
});
break;
}
}
}
ResponseType::Error
}
-32001 => {
if error.message == "Exceeded the quota usage" {
ResponseType::RateLimited
} else {
ResponseType::Error
}
}
-32005 => {
if error.message == "rate limit exceeded" {
ResponseType::RateLimited
} else {
ResponseType::Error
}
}
-32601 => {
let error_msg = error.message.as_ref();
// sometimes a provider does not support all rpc methods
// we check other connections rather than returning the error
// but sometimes the method is something that is actually unsupported,
// so we save the response here to return it later
// some providers look like this
if (error_msg.starts_with("the method")
&& error_msg.ends_with("is not available"))
|| error_msg == "Method not found"
{
let method = self.web3_request.inner.method().to_string();
response =
Err(Web3ProxyError::MethodNotFound(method.into()))
}
ResponseType::Error
}
_ => ResponseType::Error,
}
}
}
},
Ok(jsonrpc::SingleResponse::Stream(..)) => unreachable!(),
Err(_) => ResponseType::Error,
};
if matches!(response_type, ResponseType::RateLimited) {
// TODO: how long?
self.rate_limit_for(Duration::from_secs(1));
}
match error_handler {
RequestErrorHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if matches!(response_type, ResponseType::Revert) {
trace!(
rpc=%self.rpc,
%self.web3_request,
?response,
"revert",
);
} else {
debug!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
}
}
RequestErrorHandler::InfoLevel => {
info!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::TraceLevel => {
trace!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::ErrorLevel => {
// TODO: only include params if not running in release mode
error!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::WarnLevel => {
// TODO: only include params if not running in release mode
warn!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
}
RequestErrorHandler::Save => {
trace!(
rpc=%self.rpc,
%self.web3_request,
?response,
"bad response",
);
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
// TODO: open this up for even more methods
let method: Method =
Method::try_from_value(&self.web3_request.inner.method().to_string())
.unwrap();
// TODO: i don't think this prsing is correct
match serde_json::from_value::<EthCallParams>(json!(self
.web3_request
.inner
.params()))
{
Ok(params) => {
// spawn saving to the database so we don't slow down the request
// TODO: log if this errors
// TODO: aren't the method and params already saved? this should just need the response
let f = authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}
Err(err) => {
warn!(
%self.web3_request,
?response,
?err,
"failed parsing eth_call params. unable to save revert",
);
}
}
}
}
}
response
}
}

View File

@ -1,5 +1,5 @@
use super::StatType;
use crate::app::Web3ProxyApp;
use crate::app::App;
use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult};
use crate::globals::{global_db_conn, global_db_replica_conn};
use crate::http_params::{
@ -57,7 +57,7 @@ pub fn filter_query_window_seconds(
}
pub async fn query_user_stats<'a>(
app: &'a Web3ProxyApp,
app: &'a App,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
stat_response_type: StatType,

View File

@ -2,7 +2,7 @@ use super::StatType;
use crate::errors::Web3ProxyErrorContext;
use crate::globals::global_db_replica_conn;
use crate::{
app::Web3ProxyApp,
app::App,
errors::{Web3ProxyError, Web3ProxyResponse},
http_params::{
get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params,
@ -27,7 +27,7 @@ use tracing::{error, trace, warn};
use ulid::Ulid;
pub async fn query_user_influx_stats<'a>(
app: &'a Web3ProxyApp,
app: &'a App,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>,
stat_response_type: StatType,

View File

@ -9,7 +9,8 @@ use self::stat_buffer::BufferedRpcQueryStats;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::compute_units::ComputeUnit;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType, Web3Request};
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::jsonrpc::ValidatedRequest;
use crate::rpcs::one::Web3Rpc;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Months, TimeZone, Utc};
@ -202,7 +203,7 @@ impl RpcQueryStats {
/// For now there is just one, but I think there might be others later
#[derive(Debug, From)]
pub enum AppStat {
RpcQuery(Web3Request),
RpcQuery(ValidatedRequest),
}
// TODO: move to stat_buffer.rs?
@ -546,7 +547,7 @@ impl BufferedRpcQueryStats {
/// We want this to run when there is **one and only one** copy of this RequestMetadata left
/// There are often multiple copies if a request is being sent to multiple servers in parallel
impl RpcQueryStats {
fn try_from_metadata(metadata: Web3Request) -> Web3ProxyResult<Self> {
fn try_from_metadata(metadata: ValidatedRequest) -> Web3ProxyResult<Self> {
// TODO: do this without a clone
let authorization = metadata.authorization.clone();

View File

@ -2,8 +2,9 @@ use super::{AppStat, FlushedStats, RpcQueryKey};
use crate::app::Web3ProxyJoinHandle;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::{AuthorizationType, Web3Request};
use crate::frontend::authorization::AuthorizationType;
use crate::globals::global_db_conn;
use crate::jsonrpc::ValidatedRequest;
use crate::stats::RpcQueryStats;
use derive_more::From;
use futures::stream;
@ -247,7 +248,10 @@ impl StatBuffer {
}
}
async fn _buffer_web3_request(&mut self, web3_request: Web3Request) -> Web3ProxyResult<u64> {
async fn _buffer_web3_request(
&mut self,
web3_request: ValidatedRequest,
) -> Web3ProxyResult<u64> {
// we convert on this side of the channel so that we don't slow down the request
let stat = RpcQueryStats::try_from_metadata(web3_request)?;

View File

@ -3,7 +3,8 @@ use std::sync::Arc;
use tracing::{error, info};
use web3_proxy::app::BILLING_PERIOD_SECONDS;
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{Authorization, RequestOrMethod, Web3Request};
use web3_proxy::frontend::authorization::{Authorization, RequestOrMethod};
use web3_proxy::jsonrpc::ValidatedRequest;
use web3_proxy::prelude::anyhow::{self, Context};
use web3_proxy::prelude::argh::{self, FromArgs};
use web3_proxy::prelude::chrono;
@ -190,8 +191,8 @@ impl MigrateStatsToV2SubCommand {
let request = RequestOrMethod::Method(method, int_request_bytes as usize);
// Create Web3Request
let web3_request = Web3Request {
// Create ValidatedRequest
let web3_request = ValidatedRequest {
archive_request: x.archive_request.into(),
authorization: authorization.clone(),
backend_requests: Mutex::new(backend_rpcs),
@ -213,7 +214,9 @@ impl MigrateStatsToV2SubCommand {
usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(),
cache_mode: Default::default(),
start_instant: Instant::now(),
..Default::default()
connect_timeout: Default::default(),
expire_timeout: Default::default(),
permit: None,
};
web3_request.try_send_stat()?;

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::app::{flatten_handle, flatten_handles, App};
use web3_proxy::config::TopConfig;
use web3_proxy::globals::global_db_conn;
use web3_proxy::prelude::anyhow;
@ -86,7 +86,7 @@ impl ProxydSubCommand {
broadcast::channel(1);
// start the main app
let mut spawned_app = Web3ProxyApp::spawn(
let mut spawned_app = App::spawn(
frontend_port,
prometheus_port,
top_config.clone(),