diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6589e5d6..524ae68e 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -41,7 +41,7 @@ use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; -use std::sync::atomic::{self, AtomicU16, Ordering}; +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; @@ -1326,28 +1326,37 @@ impl App { let (code, response) = match last_response { Ok(response_data) => { - web3_request.error_response.store(false, Ordering::SeqCst); + let user_error_response = response_data.is_jsonrpc_err(); + + let mut response_lock = web3_request.response.lock(); + + // TODO: i really don't like this logic here. it should be inside add_response + response_lock.error_response = false; // TODO: is it true that all jsonrpc errors are user errors? - web3_request - .user_error_response - .store(response_data.is_jsonrpc_err(), Ordering::SeqCst); + response_lock.user_error_response = user_error_response; + + drop(response_lock); (StatusCode::OK, response_data) } Err(err) => { // max tries exceeded. return the error - web3_request.error_response.store(true, Ordering::SeqCst); - web3_request - .user_error_response - .store(false, Ordering::SeqCst); + let mut response_lock = web3_request.response.lock(); + + // TODO: i really don't like this logic here. it should be inside add_error_response + // TODO: what if this is an ethers wrapped error? those should have already been handled, but our error types are too broad + response_lock.error_response = true; + response_lock.user_error_response = false; + + drop(response_lock); err.as_json_response_parts(web3_request.id(), Some(web3_request.as_ref())) } }; - web3_request.add_response(&response); + web3_request.set_response(&response); let rpcs = web3_request.backend_rpcs_used(); @@ -1544,12 +1553,18 @@ impl App { }; if try_archive { - // TODO: only charge for archive if it gave a result - web3_request - .archive_request - .store(true, atomic::Ordering::SeqCst); + { + let mut response_lock = web3_request.response.lock(); + + // TODO: this is a hack. we don't usually want an archive + // we probably just hit a bug where a server said it had a block but it dosn't yet have all the transactions + response_lock + .archive_request + = true; + } + + // TODO: if the transaction wasn't found, set archive_request back to false? - // TODO: we don't actually want try_send_all. we want the first non-null, non-error response self .balanced_rpcs .try_proxy_connection::>( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 9ac39ff9..60d4f362 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -137,7 +137,7 @@ impl App { break; }; - subscription_web3_request.add_response(response_bytes); + subscription_web3_request.set_response(response_bytes); } } } @@ -217,7 +217,7 @@ impl App { // we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier let response_bytes = response_str.len() as u64; - subscription_web3_request.add_response(response_bytes); + subscription_web3_request.set_response(response_bytes); // TODO: do clients support binary messages? // TODO: can we check a content type header? @@ -260,7 +260,7 @@ impl App { // TODO: better way of passing in ParsedResponse let response = jsonrpc::SingleResponse::Parsed(response); // TODO: this serializes twice - web3_request.add_response(&response); + web3_request.set_response(&response); let response = response.parsed().await.expect("Response already parsed"); // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index be45a0b5..dd619c0d 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -401,7 +401,7 @@ async fn websocket_proxy_web3_rpc( let response = jsonrpc::SingleResponse::Parsed(response); - web3_request.add_response(&response); + web3_request.set_response(&response); let response = response.parsed().await.expect("Response already parsed"); Ok(response.into()) diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs index b2b045fc..4f3f897e 100644 --- a/web3_proxy/src/jsonrpc/request.rs +++ b/web3_proxy/src/jsonrpc/request.rs @@ -11,7 +11,7 @@ use serde_inline_default::serde_inline_default; use serde_json::value::RawValue; use std::borrow::Cow; use std::fmt; -use std::sync::{atomic, Arc}; +use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; @@ -134,13 +134,15 @@ impl JsonRpcRequestEnum { .await .unwrap(); - request - .user_error_response - .store(true, atomic::Ordering::SeqCst); + { + let mut response_lock = request.response.lock(); + + response_lock.user_error_response = true; + } let response = Web3ProxyError::BadRequest("request failed validation".into()); - request.add_response(&response); + request.set_response(&response); let response = response.into_response_with_id(Some(err_id), None::); diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index 9c2573b6..640abaed 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -11,13 +11,14 @@ use crate::{ response_cache::JsonRpcQueryCacheKey, rpcs::{blockchain::Web3ProxyBlock, one::Web3Rpc}, secrets::RpcSecretKey, - stats::{AppStat, BackendRequests}, + stats::AppStat, }; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; use chrono::Utc; use derivative::Derivative; use ethers::types::U64; +use parking_lot::Mutex; use rust_decimal::Decimal; use serde::{ser::SerializeStruct, Serialize}; use serde_json::{json, value::RawValue}; @@ -26,11 +27,7 @@ use std::{ fmt::{self, Display}, net::IpAddr, }; -use std::{ - mem, - sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64}, - time::Duration, -}; +use std::{mem, time::Duration}; use tokio::{ sync::{mpsc, OwnedSemaphorePermit}, time::Instant, @@ -181,7 +178,9 @@ impl RequestBuilder { if let Ok(x) = &x { if self.archive_request { - x.archive_request.store(true, atomic::Ordering::SeqCst); + let mut response_lock = x.response.lock(); + + response_lock.archive_request = true; } } @@ -191,15 +190,53 @@ impl RequestBuilder { } } +#[derive(Debug, Default)] +/// todo: better name. +/// the inside bits for ValidatedRequest. It's usually in an Arc, so it's not mutable +pub struct ValidatedResponse { + /// TODO: set archive_request during the new instead of after + /// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently + pub archive_request: bool, + + /// if this is empty, there was a cache_hit + /// otherwise, it is populated with any rpc servers that were used by this request + pub backend_rpcs: Vec>, + + /// The number of times the request got stuck waiting because no servers were synced + pub no_servers: u64, + + /// If handling the request hit an application error + /// This does not count things like a transcation reverting or a malformed request + /// TODO: this will need more thought once we support other ProxyMode + pub error_response: bool, + + /// Size in bytes of the JSON response. Does not include headers or things like that. + pub response_bytes: u64, + + /// How many milliseconds it took to respond to the request + pub response_millis: u64, + + /// What time the (first) response was proxied. + /// TODO: think about how to store response times for ProxyMode::Versus + pub response_timestamp: i64, + + /// If the request is invalid or received a jsonrpc error response (excluding reverts) + pub user_error_response: bool, +} + +impl ValidatedResponse { + /// True if the response required querying a backup RPC + /// RPC aggregators that query multiple providers to compare response may use this header to ignore our response. + pub fn response_from_backup_rpc(&self) -> bool { + self.backend_rpcs.last().map(|x| x.backup).unwrap_or(false) + } +} + /// TODO: /// TODO: instead of a bunch of atomics, this should probably use a RwLock. need to think more about how parallel requests are going to work though #[derive(Debug, Derivative)] #[derivative(Default)] pub struct ValidatedRequest { - /// TODO: set archive_request during the new instead of after - /// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently - pub archive_request: AtomicBool, - pub authorization: Arc, pub cache_mode: CacheMode, @@ -212,6 +249,8 @@ pub struct ValidatedRequest { /// TODO: this should be in a global config. not copied to every single request pub usd_per_cu: Decimal, + pub response: Mutex, + pub inner: RequestOrMethod, /// if the rpc key used for this request is premium (at the start of the request) @@ -222,26 +261,6 @@ pub struct ValidatedRequest { /// We use Instant and not timestamps to avoid problems with leap seconds and similar issues #[derivative(Default(value = "Instant::now()"))] pub start_instant: Instant, - /// if this is empty, there was a cache_hit - /// otherwise, it is populated with any rpc servers that were used by this request - pub backend_requests: BackendRequests, - /// The number of times the request got stuck waiting because no servers were synced - pub no_servers: AtomicU64, - /// If handling the request hit an application error - /// This does not count things like a transcation reverting or a malformed request - pub error_response: AtomicBool, - /// Size in bytes of the JSON response. Does not include headers or things like that. - pub response_bytes: AtomicU64, - /// How many milliseconds it took to respond to the request - pub response_millis: AtomicU64, - /// What time the (first) response was proxied. - /// TODO: think about how to store response times for ProxyMode::Versus - pub response_timestamp: AtomicI64, - /// True if the response required querying a backup RPC - /// RPC aggregators that query multiple providers to compare response may use this header to ignore our response. - pub response_from_backup_rpc: AtomicBool, - /// If the request is invalid or received a jsonrpc error response (excluding reverts) - pub user_error_response: AtomicBool, #[cfg(feature = "rdkafka")] /// ProxyMode::Debug logs requests and responses with Kafka @@ -283,12 +302,7 @@ impl Serialize for ValidatedRequest { where S: serde::Serializer, { - let mut state = serializer.serialize_struct("request", 7)?; - - state.serialize_field( - "archive_request", - &self.archive_request.load(atomic::Ordering::SeqCst), - )?; + let mut state = serializer.serialize_struct("request", 6)?; state.serialize_field("chain_id", &self.chain_id)?; @@ -297,10 +311,13 @@ impl Serialize for ValidatedRequest { state.serialize_field("elapsed", &self.start_instant.elapsed().as_secs_f32())?; - { - let backend_names = self.backend_requests.lock(); + let response_lock = self.response.lock(); - let backend_names = backend_names + state.serialize_field("archive_request", &response_lock.archive_request)?; + + { + let backend_names = response_lock + .backend_rpcs .iter() .map(|x| x.name.as_str()) .collect::>(); @@ -308,10 +325,9 @@ impl Serialize for ValidatedRequest { state.serialize_field("backend_requests", &backend_names)?; } - state.serialize_field( - "response_bytes", - &self.response_bytes.load(atomic::Ordering::SeqCst), - )?; + state.serialize_field("response_bytes", &response_lock.response_bytes)?; + + drop(response_lock); state.end() } @@ -375,28 +391,20 @@ impl ValidatedRequest { .max(connect_timeout); let x = Self { - archive_request: false.into(), + response: Mutex::new(Default::default()), authorization, - backend_requests: Default::default(), cache_mode, chain_id, - error_response: false.into(), connect_timeout, expire_timeout, head_block: head_block.clone(), kafka_debug_logger, - no_servers: 0.into(), inner: request, permit, - response_bytes: 0.into(), - response_from_backup_rpc: false.into(), - response_millis: 0.into(), - response_timestamp: 0.into(), start_instant, started_active_premium, stat_sender, usd_per_cu, - user_error_response: false.into(), request_id, }; @@ -491,7 +499,9 @@ impl ValidatedRequest { #[inline] pub fn backend_rpcs_used(&self) -> Vec> { - self.backend_requests.lock().clone() + let response_lock = self.response.lock(); + + response_lock.backend_rpcs.clone() } pub fn cache_key(&self) -> Option { @@ -528,10 +538,19 @@ impl ValidatedRequest { #[inline] pub fn min_block_needed(&self) -> Option { - if self.archive_request.load(atomic::Ordering::SeqCst) { - Some(U64::zero()) - } else { - self.cache_mode.from_block().map(|x| x.num()) + let min_block_needed = self.cache_mode.from_block().map(|x| x.num()); + + match min_block_needed { + Some(x) => Some(x), + None => { + let response_lock = self.response.lock(); + + if response_lock.archive_request { + Some(U64::zero()) + } else { + None + } + } } } @@ -575,32 +594,39 @@ impl ValidatedRequest { Ok(()) } - pub fn add_error_response(&self, _err: &Web3ProxyError) { - self.error_response.store(true, atomic::Ordering::SeqCst); + pub fn set_error_response(&self, _err: &Web3ProxyError) { + { + let mut response_lock = self.response.lock(); - // TODO: add actual response size - self.add_response(0); + response_lock.error_response = true; + response_lock.user_error_response = false; + } + + // TODO: add the actual response size + self.set_response(0); } - pub fn add_response<'a, R: Into>>(&'a self, response: R) { + pub fn set_response<'a, R: Into>>(&'a self, response: R) { // TODO: fetch? set? should it be None in a Mutex? or a OnceCell? let response = response.into(); let num_bytes = response.num_bytes(); - self.response_bytes - .fetch_add(num_bytes, atomic::Ordering::SeqCst); + let response_millis = self.start_instant.elapsed().as_millis() as u64; - self.response_millis.fetch_add( - self.start_instant.elapsed().as_millis() as u64, - atomic::Ordering::SeqCst, - ); + let now = Utc::now().timestamp(); - // TODO: record first or last timestamp? really, we need multiple - self.response_timestamp - .store(Utc::now().timestamp(), atomic::Ordering::SeqCst); + { + let mut response_lock = self.response.lock(); - // TODO: set user_error_response and error_response here instead of outside this function + // TODO: set user_error_response and error_response here instead of outside this function + + response_lock.response_bytes = num_bytes; + + response_lock.response_millis = response_millis; + + response_lock.response_timestamp = now; + } #[cfg(feature = "rdkafka")] if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() { diff --git a/web3_proxy/src/jsonrpc/response.rs b/web3_proxy/src/jsonrpc/response.rs index 19a69953..d250a6bb 100644 --- a/web3_proxy/src/jsonrpc/response.rs +++ b/web3_proxy/src/jsonrpc/response.rs @@ -232,7 +232,7 @@ impl IntoResponse for StreamResponse { .map_ok(move |x| { let len = x.len() as u64; - self.web3_request.add_response(len); + self.web3_request.set_response(len); x }); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e010cee7..7fc29e95 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -443,15 +443,24 @@ impl Web3Rpcs { let web3_request = ValidatedRequest::new_internal(method, params, head_block, max_wait).await?; - let response = self.request_with_metadata(&web3_request).await?; + let response = self.request_with_metadata::(&web3_request).await?; // the response might support streaming. we need to parse it let parsed = response.parsed().await?; match parsed.payload { - jsonrpc::ResponsePayload::Success { result } => Ok(result), + jsonrpc::ResponsePayload::Success { result } => { + // todo: i don't love this length + web3_request.set_response(0); + + Ok(result) + } // TODO: confirm this error type is correct - jsonrpc::ResponsePayload::Error { error } => Err(error.into()), + jsonrpc::ResponsePayload::Error { error } => { + web3_request.set_error_response(&Web3ProxyError::JsonRpcErrorData(error.clone())); + + Err(error.into()) + } } } @@ -478,7 +487,11 @@ impl Web3Rpcs { // TODO: i'd like to get rid of this clone let rpc = active_request_handle.clone_connection(); - web3_request.backend_requests.lock().push(rpc); + { + let mut response_lock = web3_request.response.lock(); + + response_lock.backend_rpcs.push(rpc); + } match active_request_handle.request::().await { Ok(response) => { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index f6d748cd..e8d3c94c 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1307,9 +1307,9 @@ impl Web3Rpc { Ok(x) => { // TODO: this is not efficient :( let x = json!(x); - web3_request.add_response(&x) + web3_request.set_response(&x) } - Err(e) => web3_request.add_error_response(e), + Err(e) => web3_request.set_error_response(e), } response diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 1c4912f5..1768e9c7 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -24,9 +24,7 @@ use migration::sea_orm::{ }; use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; -use parking_lot::Mutex; use std::borrow::Cow; -use std::sync::atomic::Ordering; use std::sync::Arc; use tracing::{error, instrument, trace, warn}; @@ -38,8 +36,6 @@ pub enum StatType { Detailed, } -pub type BackendRequests = Mutex>>; - #[derive(AddAssign, Copy, Clone, Debug, Default)] pub struct FlushedStats { /// the number of rows saved to the relational database. @@ -551,20 +547,23 @@ impl RpcQueryStats { // TODO: do this without a clone let authorization = metadata.authorization.clone(); - let archive_request = metadata.archive_request.load(Ordering::SeqCst); + let request_bytes = metadata.inner.num_bytes() as u64; + + let response_lock = metadata.response.lock(); + + let archive_request = response_lock.archive_request; // TODO: do this without cloning. we can take their vec - let backend_rpcs_used = metadata.backend_rpcs_used(); + let backend_rpcs_used = response_lock.backend_rpcs.clone(); - let request_bytes = metadata.inner.num_bytes() as u64; - let response_bytes = metadata.response_bytes.load(Ordering::SeqCst); + let response_bytes = response_lock.response_bytes; - let mut error_response = metadata.error_response.load(Ordering::SeqCst); - let mut response_millis = metadata.response_millis.load(Ordering::SeqCst); + let mut error_response = response_lock.error_response; + let mut response_millis = response_lock.response_millis; - let user_error_response = metadata.user_error_response.load(Ordering::SeqCst); + let user_error_response = response_lock.user_error_response; - let response_timestamp = match metadata.response_timestamp.load(Ordering::SeqCst) { + let response_timestamp = match response_lock.response_timestamp { 0 => { // no response timestamp! if !error_response { diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index db971070..16643acb 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -4,6 +4,7 @@ use tracing::{error, info}; use web3_proxy::app::BILLING_PERIOD_SECONDS; use web3_proxy::config::TopConfig; use web3_proxy::frontend::authorization::{Authorization, RequestOrMethod}; +use web3_proxy::jsonrpc::request_builder::ValidatedResponse; use web3_proxy::jsonrpc::ValidatedRequest; use web3_proxy::prelude::anyhow::{self, Context}; use web3_proxy::prelude::argh::{self, FromArgs}; @@ -191,27 +192,29 @@ impl MigrateStatsToV2SubCommand { let request = RequestOrMethod::Method(method, int_request_bytes as usize); + let web3_response = ValidatedResponse { + archive_request: x.archive_request, + backend_rpcs, + error_response: x.error_response, + // This is not relevant in the new version + no_servers: 0, + response_bytes: int_response_bytes, + response_timestamp: x.period_datetime.timestamp(), + response_millis: int_response_millis, + user_error_response: false, + }; + // Create ValidatedRequest let web3_request = ValidatedRequest { - archive_request: x.archive_request.into(), + response: Mutex::new(web3_response), authorization: authorization.clone(), - backend_requests: Mutex::new(backend_rpcs), chain_id, - error_response: x.error_response.into(), head_block: None, // debug data is in kafka, not mysql or influx kafka_debug_logger: None, inner: request, - // This is not relevant in the new version - no_servers: 0.into(), - response_bytes: int_response_bytes.into(), - // We did not initially record this data - response_from_backup_rpc: false.into(), - response_timestamp: x.period_datetime.timestamp().into(), - response_millis: int_response_millis.into(), stat_sender: Some(stat_sender.clone()), started_active_premium: false, - user_error_response: false.into(), usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(), cache_mode: Default::default(), start_instant: Instant::now(),