trace everything

This commit is contained in:
Bryan Stitt 2022-10-28 22:52:47 +00:00
parent 21654df966
commit a534eae968
14 changed files with 83 additions and 15 deletions

View File

@ -43,7 +43,7 @@ use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{error, info, trace, warn}; use tracing::{error, info, instrument, trace, warn};
use ulid::Ulid; use ulid::Ulid;
// TODO: make this customizable? // TODO: make this customizable?
@ -118,6 +118,7 @@ pub struct Web3ProxyApp {
/// flatten a JoinError into an anyhow error /// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures. /// Useful when joining multiple futures.
#[instrument(skip_all)]
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> { pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await { match handle.await {
Ok(Ok(result)) => Ok(result), Ok(Ok(result)) => Ok(result),
@ -127,6 +128,7 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
} }
/// return the first error or okay if everything worked /// return the first error or okay if everything worked
#[instrument(skip_all)]
pub async fn flatten_handles<T>( pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>, mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -142,6 +144,7 @@ pub async fn flatten_handles<T>(
} }
/// Connect to the database and run migrations /// Connect to the database and run migrations
#[instrument(level = "trace")]
pub async fn get_migrated_db( pub async fn get_migrated_db(
db_url: String, db_url: String,
min_connections: u32, min_connections: u32,
@ -172,6 +175,7 @@ pub async fn get_migrated_db(
#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)] #[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)]
impl Web3ProxyApp { impl Web3ProxyApp {
/// The main entrypoint. /// The main entrypoint.
#[instrument(level = "trace")]
pub async fn spawn( pub async fn spawn(
top_config: TopConfig, top_config: TopConfig,
num_workers: usize, num_workers: usize,
@ -492,6 +496,7 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles)) Ok((app, cancellable_handles, important_background_handles))
} }
#[instrument(level = "trace")]
pub fn prometheus_metrics(&self) -> String { pub fn prometheus_metrics(&self) -> String {
let globals = HashMap::new(); let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what? // TODO: what globals? should this be the hostname or what?
@ -513,6 +518,7 @@ impl Web3ProxyApp {
} }
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
#[instrument(level = "trace")]
pub async fn eth_subscribe<'a>( pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>, self: &'a Arc<Self>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -709,6 +715,7 @@ impl Web3ProxyApp {
} }
/// send the request or batch of requests to the approriate RPCs /// send the request or batch of requests to the approriate RPCs
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -747,6 +754,7 @@ impl Web3ProxyApp {
/// cut up the request and send to potentually different servers /// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem /// TODO: make sure this isn't a problem
#[instrument(level = "trace")]
async fn proxy_web3_rpc_requests( async fn proxy_web3_rpc_requests(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -779,10 +787,12 @@ impl Web3ProxyApp {
} }
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
#[instrument(level = "trace")]
pub fn db_conn(&self) -> Option<DatabaseConnection> { pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone() self.db_conn.clone()
} }
#[instrument(level = "trace")]
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> { pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() { match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")), None => Err(anyhow::anyhow!("no redis server configured")),
@ -795,6 +805,7 @@ impl Web3ProxyApp {
} }
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
#[instrument(level = "trace")]
async fn proxy_web3_rpc_request( async fn proxy_web3_rpc_request(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,

View File

@ -4,7 +4,7 @@ use ethers::{
prelude::{BlockNumber, U64}, prelude::{BlockNumber, U64},
types::H256, types::H256,
}; };
use tracing::warn; use tracing::{instrument, warn};
use crate::rpcs::connections::Web3Connections; use crate::rpcs::connections::Web3Connections;
@ -38,6 +38,7 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 {
} }
/// modify params to always have a block number and not "latest" /// modify params to always have a block number and not "latest"
#[instrument(level = "trace")]
pub async fn clean_block_number( pub async fn clean_block_number(
params: &mut serde_json::Value, params: &mut serde_json::Value,
block_param_id: usize, block_param_id: usize,
@ -95,6 +96,7 @@ pub async fn clean_block_number(
} }
// TODO: change this to also return the hash needed? // TODO: change this to also return the hash needed?
#[instrument(level = "trace")]
pub async fn block_needed( pub async fn block_needed(
method: &str, method: &str,
params: Option<&mut serde_json::Value>, params: Option<&mut serde_json::Value>,

View File

@ -9,6 +9,7 @@ use hashbrown::HashMap;
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tracing::instrument;
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>); pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>); pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
@ -194,6 +195,7 @@ impl Web3ConnectionConfig {
/// Create a Web3Connection from config /// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate)) /// TODO: move this into Web3Connection (just need to make things pub(crate))
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", skip(redis_pool))]
pub async fn spawn( pub async fn spawn(
self, self,
name: String, name: String,

View File

@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc}; use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{error, trace}; use tracing::{error, instrument, trace};
use ulid::Ulid; use ulid::Ulid;
use uuid::Uuid; use uuid::Uuid;
@ -322,6 +322,7 @@ pub async fn key_is_authorized(
impl Web3ProxyApp { impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given ip address. /// Limit the number of concurrent requests from the given ip address.
#[instrument(level = "trace")]
pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<Option<OwnedSemaphorePermit>> { pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self let semaphore = self
@ -347,6 +348,7 @@ impl Web3ProxyApp {
} }
/// Limit the number of concurrent requests from the given key address. /// Limit the number of concurrent requests from the given key address.
#[instrument(level = "trace")]
pub async fn user_rpc_key_semaphore( pub async fn user_rpc_key_semaphore(
&self, &self,
rpc_key_data: &UserKeyData, rpc_key_data: &UserKeyData,
@ -375,6 +377,7 @@ impl Web3ProxyApp {
/// Verify that the given bearer token and address are allowed to take the specified action. /// Verify that the given bearer token and address are allowed to take the specified action.
/// This includes concurrent request limiting. /// This includes concurrent request limiting.
#[instrument(level = "trace")]
pub async fn bearer_is_authorized( pub async fn bearer_is_authorized(
&self, &self,
bearer: Bearer, bearer: Bearer,
@ -414,6 +417,7 @@ impl Web3ProxyApp {
Ok((user, semaphore_permit)) Ok((user, semaphore_permit))
} }
#[instrument(level = "trace")]
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key // TODO: dry this up with rate_limit_by_key
// TODO: do we want a semaphore here? // TODO: do we want a semaphore here?
@ -446,6 +450,7 @@ impl Web3ProxyApp {
} }
} }
#[instrument(level = "trace")]
pub async fn rate_limit_by_ip( pub async fn rate_limit_by_ip(
&self, &self,
ip: IpAddr, ip: IpAddr,
@ -495,6 +500,7 @@ impl Web3ProxyApp {
} }
// check the local cache for user data, or query the database // check the local cache for user data, or query the database
#[instrument(level = "trace")]
pub(crate) async fn user_data(&self, rpc_key: RpcApiKey) -> anyhow::Result<UserKeyData> { pub(crate) async fn user_data(&self, rpc_key: RpcApiKey) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self let user_data: Result<_, Arc<anyhow::Error>> = self
.rpc_key_cache .rpc_key_cache
@ -589,6 +595,7 @@ impl Web3ProxyApp {
user_data.map_err(|err| anyhow::anyhow!(err)) user_data.map_err(|err| anyhow::anyhow!(err))
} }
#[instrument(level = "trace")]
pub async fn rate_limit_by_key(&self, rpc_key: RpcApiKey) -> anyhow::Result<RateLimitResult> { pub async fn rate_limit_by_key(&self, rpc_key: RpcApiKey) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(rpc_key).await?; let user_data = self.user_data(rpc_key).await?;

View File

@ -15,13 +15,13 @@ use reqwest::header::ToStrError;
use sea_orm::DbErr; use sea_orm::DbErr;
use std::{error::Error, net::IpAddr}; use std::{error::Error, net::IpAddr};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::warn; use tracing::{instrument, warn};
// TODO: take "IntoResponse" instead of Response? // TODO: take "IntoResponse" instead of Response?
pub type FrontendResult = Result<Response, FrontendErrorResponse>; pub type FrontendResult = Result<Response, FrontendErrorResponse>;
// TODO: // TODO:
#[derive(From)] #[derive(Debug, From)]
pub enum FrontendErrorResponse { pub enum FrontendErrorResponse {
Anyhow(anyhow::Error), Anyhow(anyhow::Error),
Box(Box<dyn Error>), Box(Box<dyn Error>),
@ -41,6 +41,7 @@ pub enum FrontendErrorResponse {
} }
impl IntoResponse for FrontendErrorResponse { impl IntoResponse for FrontendErrorResponse {
#[instrument(level = "trace")]
fn into_response(self) -> Response { fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs // TODO: include the request id in these so that users can give us something that will point to logs
let (status_code, response) = match self { let (status_code, response) = match self {
@ -202,6 +203,7 @@ impl IntoResponse for FrontendErrorResponse {
} }
} }
#[instrument(level = "trace")]
pub async fn handler_404() -> Response { pub async fn handler_404() -> Response {
FrontendErrorResponse::NotFound.into_response() FrontendErrorResponse::NotFound.into_response()
} }

View File

@ -24,9 +24,10 @@ use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tower_request_id::{RequestId, RequestIdLayer}; use tower_request_id::{RequestId, RequestIdLayer};
use tracing::{error_span, info}; use tracing::{error_span, info, instrument};
/// Start the frontend server. /// Start the frontend server.
#[instrument(level = "trace")]
pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> { pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
// create a tracing span for each request with a random request id and the method // create a tracing span for each request with a random request id and the method
// GET: websocket or static pages // GET: websocket or static pages

View File

@ -10,12 +10,13 @@ use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp; use axum_client_ip::ClientIp;
use axum_macros::debug_handler; use axum_macros::debug_handler;
use std::sync::Arc; use std::sync::Arc;
use tracing::{error_span, Instrument}; use tracing::{error_span, instrument, Instrument};
/// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. /// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this.
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
/// If possible, please use a WebSocket instead. /// If possible, please use a WebSocket instead.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
@ -49,6 +50,7 @@ pub async fn proxy_web3_rpc(
/// Can optionally authorized based on origin, referer, or user agent. /// Can optionally authorized based on origin, referer, or user agent.
/// If possible, please use a WebSocket instead. /// If possible, please use a WebSocket instead.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc_with_key( pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,

View File

@ -23,7 +23,7 @@ use hashbrown::HashMap;
use serde_json::{json, value::RawValue}; use serde_json::{json, value::RawValue};
use std::sync::Arc; use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, error_span, info, trace, Instrument}; use tracing::{error, error_span, info, instrument, trace, Instrument};
use crate::{ use crate::{
app::Web3ProxyApp, app::Web3ProxyApp,
@ -33,6 +33,7 @@ use crate::{
/// Public entrypoint for WebSocket JSON-RPC requests. /// Public entrypoint for WebSocket JSON-RPC requests.
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn websocket_handler( pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
@ -75,6 +76,7 @@ pub async fn websocket_handler(
/// Rate limit and billing based on the api key in the url. /// Rate limit and billing based on the api key in the url.
/// Can optionally authorized based on origin, referer, or user agent. /// Can optionally authorized based on origin, referer, or user agent.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn websocket_handler_with_key( pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
@ -134,6 +136,7 @@ pub async fn websocket_handler_with_key(
} }
} }
#[instrument(level = "trace")]
async fn proxy_web3_socket( async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -155,6 +158,7 @@ async fn proxy_web3_socket(
} }
/// websockets support a few more methods than http clients /// websockets support a few more methods than http clients
#[instrument(level = "trace")]
async fn handle_socket_payload( async fn handle_socket_payload(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -245,6 +249,7 @@ async fn handle_socket_payload(
Message::Text(response_str) Message::Text(response_str)
} }
#[instrument(level = "trace")]
async fn read_web3_socket( async fn read_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorized_request: Arc<AuthorizedRequest>,
@ -303,6 +308,7 @@ async fn read_web3_socket(
} }
} }
#[instrument(level = "trace")]
async fn write_web3_socket( async fn write_web3_socket(
response_rx: flume::Receiver<Message>, response_rx: flume::Receiver<Message>,
mut ws_tx: SplitSink<WebSocket, Message>, mut ws_tx: SplitSink<WebSocket, Message>,

View File

@ -9,9 +9,11 @@ use axum_macros::debug_handler;
use moka::future::ConcurrentCacheExt; use moka::future::ConcurrentCacheExt;
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use tracing::instrument;
/// Health check page for load balancers to use. /// Health check page for load balancers to use.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: also check that the head block is not too old // TODO: also check that the head block is not too old
if app.balanced_rpcs.synced() { if app.balanced_rpcs.synced() {
@ -25,6 +27,7 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// ///
/// TODO: when done debugging, remove this and only allow access on a different port /// TODO: when done debugging, remove this and only allow access on a different port
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.prometheus_metrics() app.prometheus_metrics()
} }
@ -33,6 +36,7 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
/// ///
/// TODO: replace this with proper stats and monitoring /// TODO: replace this with proper stats and monitoring
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync(); app.pending_transactions.sync();
app.rpc_key_cache.sync(); app.rpc_key_cache.sync();

View File

@ -35,7 +35,7 @@ use std::ops::Add;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
use tracing::warn; use tracing::{instrument, warn};
use ulid::Ulid; use ulid::Ulid;
use uuid::Uuid; use uuid::Uuid;
@ -57,6 +57,7 @@ use uuid::Uuid;
/// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist. /// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist.
/// We can prompt for an email and and payment after they log in. /// We can prompt for an email and and payment after they log in.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_login_get( pub async fn user_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
@ -143,7 +144,7 @@ pub struct PostLoginQuery {
/// JSON body to our `post_login` handler. /// JSON body to our `post_login` handler.
/// Currently only siwe logins that send an address, msg, and sig are allowed. /// Currently only siwe logins that send an address, msg, and sig are allowed.
/// Email/password and other login methods are planned. /// Email/password and other login methods are planned.
#[derive(Deserialize)] #[derive(Debug, Deserialize)]
pub struct PostLogin { pub struct PostLogin {
sig: String, sig: String,
msg: String, msg: String,
@ -153,6 +154,7 @@ pub struct PostLogin {
/// It is recommended to save the returned bearer token in a cookie. /// It is recommended to save the returned bearer token in a cookie.
/// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_login_post( pub async fn user_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
@ -330,6 +332,7 @@ pub async fn user_login_post(
/// `POST /user/logout` - Forget the bearer token in the `Authentication` header. /// `POST /user/logout` - Forget the bearer token in the `Authentication` header.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_logout_post( pub async fn user_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -351,6 +354,7 @@ pub async fn user_logout_post(
/// ///
/// TODO: this will change as we add better support for secondary users. /// TODO: this will change as we add better support for secondary users.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_get( pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -361,13 +365,14 @@ pub async fn user_get(
} }
/// the JSON input to the `post_user` handler. /// the JSON input to the `post_user` handler.
#[derive(Deserialize)] #[derive(Debug, Deserialize)]
pub struct UserPost { pub struct UserPost {
email: Option<String>, email: Option<String>,
} }
/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header. /// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_post( pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -414,6 +419,7 @@ pub async fn user_post(
/// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users. /// TODO: this will change as we add better support for secondary users.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_balance_get( pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -431,6 +437,7 @@ pub async fn user_balance_get(
/// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users. /// TODO: this will change as we add better support for secondary users.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_balance_post( pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -444,6 +451,7 @@ pub async fn user_balance_post(
/// ///
/// TODO: one key per request? maybe /user/keys/:rpc_key? /// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn rpc_keys_get( pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -471,7 +479,7 @@ pub async fn rpc_keys_get(
} }
/// the JSON input to the `rpc_keys_post` handler. /// the JSON input to the `rpc_keys_post` handler.
#[derive(Deserialize)] #[derive(Debug, Deserialize)]
pub struct UserKeysPost { pub struct UserKeysPost {
// TODO: make sure the email address is valid. probably have a "verified" column in the database // TODO: make sure the email address is valid. probably have a "verified" column in the database
existing_key_id: Option<u64>, existing_key_id: Option<u64>,
@ -493,6 +501,7 @@ pub struct UserKeysPost {
/// TODO: read json from the request body /// TODO: read json from the request body
/// TODO: one key per request? maybe /user/keys/:rpc_key? /// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn rpc_keys_post( pub async fn rpc_keys_post(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -668,6 +677,7 @@ pub async fn rpc_keys_post(
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_revert_logs_get( pub async fn user_revert_logs_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>, TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -731,6 +741,7 @@ pub async fn user_revert_logs_get(
/// ///
/// TODO: this will change as we add better support for secondary users. /// TODO: this will change as we add better support for secondary users.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_stats_detailed_get( pub async fn user_stats_detailed_get(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -743,6 +754,7 @@ pub async fn user_stats_detailed_get(
/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested.
#[debug_handler] #[debug_handler]
#[instrument(level = "trace")]
pub async fn user_stats_aggregate_get( pub async fn user_stats_aggregate_get(
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,

View File

@ -4,6 +4,7 @@ use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use tracing::instrument;
// this is used by serde // this is used by serde
#[allow(dead_code)] #[allow(dead_code)]
@ -193,10 +194,12 @@ impl JsonRpcForwardedResponse {
Self::from_string(message, code, id) Self::from_string(message, code, id)
} }
#[instrument(level = "trace")]
pub fn from_str(message: &str, code: Option<i64>, id: Option<Box<RawValue>>) -> Self { pub fn from_str(message: &str, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
Self::from_string(message.to_string(), code, id) Self::from_string(message.to_string(), code, id)
} }
#[instrument(level = "trace")]
pub fn from_string(message: String, code: Option<i64>, id: Option<Box<RawValue>>) -> Self { pub fn from_string(message: String, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
// TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton? // TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton?
@ -214,6 +217,7 @@ impl JsonRpcForwardedResponse {
} }
} }
#[instrument(level = "trace")]
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self { pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse { JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(), jsonrpc: "2.0".to_string(),
@ -224,6 +228,7 @@ impl JsonRpcForwardedResponse {
} }
} }
#[instrument(level = "trace")]
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self { pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response = let partial_response =
serde_json::to_string(&partial_response).expect("this should always work"); serde_json::to_string(&partial_response).expect("this should always work");
@ -239,6 +244,7 @@ impl JsonRpcForwardedResponse {
} }
} }
#[instrument(level = "trace")]
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> anyhow::Result<Self> { pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> anyhow::Result<Self> {
// TODO: move turning ClientError into json to a helper function? // TODO: move turning ClientError into json to a helper function?
let code; let code;
@ -298,6 +304,7 @@ impl JsonRpcForwardedResponse {
}) })
} }
#[instrument(level = "trace")]
pub fn try_from_response_result( pub fn try_from_response_result(
result: Result<Box<RawValue>, ProviderError>, result: Result<Box<RawValue>, ProviderError>,
id: Box<RawValue>, id: Box<RawValue>,

View File

@ -4,11 +4,12 @@ use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router}; use axum::{routing::get, Extension, Router};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tracing::{info, instrument};
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
/// Run a prometheus metrics server on the given port. /// Run a prometheus metrics server on the given port.
#[instrument(level = "trace")]
pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> { pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
// build our application with a route // build our application with a route
// order most to least common // order most to least common
@ -41,6 +42,7 @@ pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
.map_err(Into::into) .map_err(Into::into)
} }
#[instrument(level = "trace")]
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response { async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics(); let serialized = app.prometheus_metrics();

View File

@ -16,7 +16,7 @@ use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc}; use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tokio::time::Duration; use tokio::time::Duration;
use tracing::{debug, info, trace, warn, Level}; use tracing::{debug, instrument, trace, warn, Level};
// TODO: type for Hydrated Blocks with their full transactions? // TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>; pub type ArcBlock = Arc<Block<TxHash>>;
@ -38,6 +38,7 @@ impl Display for BlockId {
impl Web3Connections { impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain /// add a block to our map and it's hash to our graphmap of the blockchain
#[instrument]
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
// TODO: i think we can rearrange this function to make it faster on the hot path // TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?; let block_hash = block.hash.as_ref().context("no block hash")?;
@ -84,6 +85,7 @@ impl Web3Connections {
/// Get a block from caches with fallback. /// Get a block from caches with fallback.
/// Will query a specific node or the best available. /// Will query a specific node or the best available.
#[instrument(level = "trace")]
pub async fn block( pub async fn block(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorized_request: Option<&Arc<AuthorizedRequest>>,

View File

@ -13,12 +13,13 @@ use sea_orm::{
ColumnTrait, Condition, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder, ColumnTrait, Condition, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, RelationTrait, QuerySelect, RelationTrait,
}; };
use tracing::trace; use tracing::{instrument, trace};
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
/// get the attached address from redis for the given auth_token. /// get the attached address from redis for the given auth_token.
/// 0 means all users /// 0 means all users
#[instrument(level = "trace", skip(redis_conn))]
async fn get_user_id_from_params( async fn get_user_id_from_params(
mut redis_conn: RedisConnection, mut redis_conn: RedisConnection,
// this is a long type. should we strip it down? // this is a long type. should we strip it down?
@ -56,6 +57,7 @@ async fn get_user_id_from_params(
/// only allow rpc_key to be set if user_id is also set. /// only allow rpc_key to be set if user_id is also set.
/// this will keep people from reading someone else's keys. /// this will keep people from reading someone else's keys.
/// 0 means none. /// 0 means none.
#[instrument(level = "trace")]
pub fn get_rpc_key_id_from_params( pub fn get_rpc_key_id_from_params(
user_id: u64, user_id: u64,
params: &HashMap<String, String>, params: &HashMap<String, String>,
@ -74,6 +76,7 @@ pub fn get_rpc_key_id_from_params(
} }
} }
#[instrument(level = "trace")]
pub fn get_chain_id_from_params( pub fn get_chain_id_from_params(
app: &Web3ProxyApp, app: &Web3ProxyApp,
params: &HashMap<String, String>, params: &HashMap<String, String>,
@ -88,6 +91,7 @@ pub fn get_chain_id_from_params(
) )
} }
#[instrument(level = "trace")]
pub fn get_query_start_from_params( pub fn get_query_start_from_params(
params: &HashMap<String, String>, params: &HashMap<String, String>,
) -> anyhow::Result<chrono::NaiveDateTime> { ) -> anyhow::Result<chrono::NaiveDateTime> {
@ -111,6 +115,7 @@ pub fn get_query_start_from_params(
) )
} }
#[instrument(level = "trace")]
pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> { pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
params.get("page").map_or_else::<anyhow::Result<u64>, _, _>( params.get("page").map_or_else::<anyhow::Result<u64>, _, _>(
|| { || {
@ -127,6 +132,7 @@ pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<
) )
} }
#[instrument(level = "trace")]
pub fn get_query_window_seconds_from_params( pub fn get_query_window_seconds_from_params(
params: &HashMap<String, String>, params: &HashMap<String, String>,
) -> anyhow::Result<u64> { ) -> anyhow::Result<u64> {
@ -148,6 +154,7 @@ pub fn get_query_window_seconds_from_params(
} }
/// stats aggregated across a large time period /// stats aggregated across a large time period
#[instrument(level = "trace")]
pub async fn get_aggregate_rpc_stats_from_params( pub async fn get_aggregate_rpc_stats_from_params(
app: &Web3ProxyApp, app: &Web3ProxyApp,
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -294,6 +301,7 @@ pub async fn get_aggregate_rpc_stats_from_params(
} }
/// stats grouped by key_id and error_repsponse and method and key /// stats grouped by key_id and error_repsponse and method and key
#[instrument(level = "trace")]
pub async fn get_detailed_stats( pub async fn get_detailed_stats(
app: &Web3ProxyApp, app: &Web3ProxyApp,
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,