instead of multiple atomics, do one mutex

This commit is contained in:
Bryan Stitt 2023-11-22 23:54:44 -04:00
parent c5591786e9
commit 503a58f47a
10 changed files with 185 additions and 127 deletions

@ -41,7 +41,7 @@ use std::fmt;
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicU16, Ordering};
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
@ -1326,28 +1326,37 @@ impl App {
let (code, response) = match last_response {
Ok(response_data) => {
web3_request.error_response.store(false, Ordering::SeqCst);
let user_error_response = response_data.is_jsonrpc_err();
let mut response_lock = web3_request.response.lock();
// TODO: i really don't like this logic here. it should be inside add_response
response_lock.error_response = false;
// TODO: is it true that all jsonrpc errors are user errors?
web3_request
.user_error_response
.store(response_data.is_jsonrpc_err(), Ordering::SeqCst);
response_lock.user_error_response = user_error_response;
drop(response_lock);
(StatusCode::OK, response_data)
}
Err(err) => {
// max tries exceeded. return the error
web3_request.error_response.store(true, Ordering::SeqCst);
web3_request
.user_error_response
.store(false, Ordering::SeqCst);
let mut response_lock = web3_request.response.lock();
// TODO: i really don't like this logic here. it should be inside add_error_response
// TODO: what if this is an ethers wrapped error? those should have already been handled, but our error types are too broad
response_lock.error_response = true;
response_lock.user_error_response = false;
drop(response_lock);
err.as_json_response_parts(web3_request.id(), Some(web3_request.as_ref()))
}
};
web3_request.add_response(&response);
web3_request.set_response(&response);
let rpcs = web3_request.backend_rpcs_used();
@ -1544,12 +1553,18 @@ impl App {
};
if try_archive {
// TODO: only charge for archive if it gave a result
web3_request
.archive_request
.store(true, atomic::Ordering::SeqCst);
{
let mut response_lock = web3_request.response.lock();
// TODO: this is a hack. we don't usually want an archive
// we probably just hit a bug where a server said it had a block but it dosn't yet have all the transactions
response_lock
.archive_request
= true;
}
// TODO: if the transaction wasn't found, set archive_request back to false?
// TODO: we don't actually want try_send_all. we want the first non-null, non-error response
self
.balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>(

@ -137,7 +137,7 @@ impl App {
break;
};
subscription_web3_request.add_response(response_bytes);
subscription_web3_request.set_response(response_bytes);
}
}
}
@ -217,7 +217,7 @@ impl App {
// we could use ForwardedResponse::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len() as u64;
subscription_web3_request.add_response(response_bytes);
subscription_web3_request.set_response(response_bytes);
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
@ -260,7 +260,7 @@ impl App {
// TODO: better way of passing in ParsedResponse
let response = jsonrpc::SingleResponse::Parsed(response);
// TODO: this serializes twice
web3_request.add_response(&response);
web3_request.set_response(&response);
let response = response.parsed().await.expect("Response already parsed");
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?

@ -401,7 +401,7 @@ async fn websocket_proxy_web3_rpc(
let response = jsonrpc::SingleResponse::Parsed(response);
web3_request.add_response(&response);
web3_request.set_response(&response);
let response = response.parsed().await.expect("Response already parsed");
Ok(response.into())

@ -11,7 +11,7 @@ use serde_inline_default::serde_inline_default;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt;
use std::sync::{atomic, Arc};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
@ -134,13 +134,15 @@ impl JsonRpcRequestEnum {
.await
.unwrap();
request
.user_error_response
.store(true, atomic::Ordering::SeqCst);
{
let mut response_lock = request.response.lock();
response_lock.user_error_response = true;
}
let response = Web3ProxyError::BadRequest("request failed validation".into());
request.add_response(&response);
request.set_response(&response);
let response = response.into_response_with_id(Some(err_id), None::<RequestForError>);

@ -11,13 +11,14 @@ use crate::{
response_cache::JsonRpcQueryCacheKey,
rpcs::{blockchain::Web3ProxyBlock, one::Web3Rpc},
secrets::RpcSecretKey,
stats::{AppStat, BackendRequests},
stats::AppStat,
};
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use chrono::Utc;
use derivative::Derivative;
use ethers::types::U64;
use parking_lot::Mutex;
use rust_decimal::Decimal;
use serde::{ser::SerializeStruct, Serialize};
use serde_json::{json, value::RawValue};
@ -26,11 +27,7 @@ use std::{
fmt::{self, Display},
net::IpAddr,
};
use std::{
mem,
sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64},
time::Duration,
};
use std::{mem, time::Duration};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
time::Instant,
@ -181,7 +178,9 @@ impl RequestBuilder {
if let Ok(x) = &x {
if self.archive_request {
x.archive_request.store(true, atomic::Ordering::SeqCst);
let mut response_lock = x.response.lock();
response_lock.archive_request = true;
}
}
@ -191,15 +190,53 @@ impl RequestBuilder {
}
}
#[derive(Debug, Default)]
/// todo: better name.
/// the inside bits for ValidatedRequest. It's usually in an Arc, so it's not mutable
pub struct ValidatedResponse {
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: bool,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_rpcs: Vec<Arc<Web3Rpc>>,
/// The number of times the request got stuck waiting because no servers were synced
pub no_servers: u64,
/// If handling the request hit an application error
/// This does not count things like a transcation reverting or a malformed request
/// TODO: this will need more thought once we support other ProxyMode
pub error_response: bool,
/// Size in bytes of the JSON response. Does not include headers or things like that.
pub response_bytes: u64,
/// How many milliseconds it took to respond to the request
pub response_millis: u64,
/// What time the (first) response was proxied.
/// TODO: think about how to store response times for ProxyMode::Versus
pub response_timestamp: i64,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: bool,
}
impl ValidatedResponse {
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub fn response_from_backup_rpc(&self) -> bool {
self.backend_rpcs.last().map(|x| x.backup).unwrap_or(false)
}
}
/// TODO:
/// TODO: instead of a bunch of atomics, this should probably use a RwLock. need to think more about how parallel requests are going to work though
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct ValidatedRequest {
/// TODO: set archive_request during the new instead of after
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: AtomicBool,
pub authorization: Arc<Authorization>,
pub cache_mode: CacheMode,
@ -212,6 +249,8 @@ pub struct ValidatedRequest {
/// TODO: this should be in a global config. not copied to every single request
pub usd_per_cu: Decimal,
pub response: Mutex<ValidatedResponse>,
pub inner: RequestOrMethod,
/// if the rpc key used for this request is premium (at the start of the request)
@ -222,26 +261,6 @@ pub struct ValidatedRequest {
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
#[derivative(Default(value = "Instant::now()"))]
pub start_instant: Instant,
/// if this is empty, there was a cache_hit
/// otherwise, it is populated with any rpc servers that were used by this request
pub backend_requests: BackendRequests,
/// The number of times the request got stuck waiting because no servers were synced
pub no_servers: AtomicU64,
/// If handling the request hit an application error
/// This does not count things like a transcation reverting or a malformed request
pub error_response: AtomicBool,
/// Size in bytes of the JSON response. Does not include headers or things like that.
pub response_bytes: AtomicU64,
/// How many milliseconds it took to respond to the request
pub response_millis: AtomicU64,
/// What time the (first) response was proxied.
/// TODO: think about how to store response times for ProxyMode::Versus
pub response_timestamp: AtomicI64,
/// True if the response required querying a backup RPC
/// RPC aggregators that query multiple providers to compare response may use this header to ignore our response.
pub response_from_backup_rpc: AtomicBool,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: AtomicBool,
#[cfg(feature = "rdkafka")]
/// ProxyMode::Debug logs requests and responses with Kafka
@ -283,12 +302,7 @@ impl Serialize for ValidatedRequest {
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("request", 7)?;
state.serialize_field(
"archive_request",
&self.archive_request.load(atomic::Ordering::SeqCst),
)?;
let mut state = serializer.serialize_struct("request", 6)?;
state.serialize_field("chain_id", &self.chain_id)?;
@ -297,10 +311,13 @@ impl Serialize for ValidatedRequest {
state.serialize_field("elapsed", &self.start_instant.elapsed().as_secs_f32())?;
{
let backend_names = self.backend_requests.lock();
let response_lock = self.response.lock();
let backend_names = backend_names
state.serialize_field("archive_request", &response_lock.archive_request)?;
{
let backend_names = response_lock
.backend_rpcs
.iter()
.map(|x| x.name.as_str())
.collect::<Vec<_>>();
@ -308,10 +325,9 @@ impl Serialize for ValidatedRequest {
state.serialize_field("backend_requests", &backend_names)?;
}
state.serialize_field(
"response_bytes",
&self.response_bytes.load(atomic::Ordering::SeqCst),
)?;
state.serialize_field("response_bytes", &response_lock.response_bytes)?;
drop(response_lock);
state.end()
}
@ -375,28 +391,20 @@ impl ValidatedRequest {
.max(connect_timeout);
let x = Self {
archive_request: false.into(),
response: Mutex::new(Default::default()),
authorization,
backend_requests: Default::default(),
cache_mode,
chain_id,
error_response: false.into(),
connect_timeout,
expire_timeout,
head_block: head_block.clone(),
kafka_debug_logger,
no_servers: 0.into(),
inner: request,
permit,
response_bytes: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
response_timestamp: 0.into(),
start_instant,
started_active_premium,
stat_sender,
usd_per_cu,
user_error_response: false.into(),
request_id,
};
@ -491,7 +499,9 @@ impl ValidatedRequest {
#[inline]
pub fn backend_rpcs_used(&self) -> Vec<Arc<Web3Rpc>> {
self.backend_requests.lock().clone()
let response_lock = self.response.lock();
response_lock.backend_rpcs.clone()
}
pub fn cache_key(&self) -> Option<u64> {
@ -528,10 +538,19 @@ impl ValidatedRequest {
#[inline]
pub fn min_block_needed(&self) -> Option<U64> {
if self.archive_request.load(atomic::Ordering::SeqCst) {
Some(U64::zero())
} else {
self.cache_mode.from_block().map(|x| x.num())
let min_block_needed = self.cache_mode.from_block().map(|x| x.num());
match min_block_needed {
Some(x) => Some(x),
None => {
let response_lock = self.response.lock();
if response_lock.archive_request {
Some(U64::zero())
} else {
None
}
}
}
}
@ -575,32 +594,39 @@ impl ValidatedRequest {
Ok(())
}
pub fn add_error_response(&self, _err: &Web3ProxyError) {
self.error_response.store(true, atomic::Ordering::SeqCst);
pub fn set_error_response(&self, _err: &Web3ProxyError) {
{
let mut response_lock = self.response.lock();
// TODO: add actual response size
self.add_response(0);
response_lock.error_response = true;
response_lock.user_error_response = false;
}
// TODO: add the actual response size
self.set_response(0);
}
pub fn add_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
pub fn set_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
// TODO: fetch? set? should it be None in a Mutex? or a OnceCell?
let response = response.into();
let num_bytes = response.num_bytes();
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::SeqCst);
let response_millis = self.start_instant.elapsed().as_millis() as u64;
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::SeqCst,
);
let now = Utc::now().timestamp();
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::SeqCst);
{
let mut response_lock = self.response.lock();
// TODO: set user_error_response and error_response here instead of outside this function
// TODO: set user_error_response and error_response here instead of outside this function
response_lock.response_bytes = num_bytes;
response_lock.response_millis = response_millis;
response_lock.response_timestamp = now;
}
#[cfg(feature = "rdkafka")]
if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() {

@ -232,7 +232,7 @@ impl<T> IntoResponse for StreamResponse<T> {
.map_ok(move |x| {
let len = x.len() as u64;
self.web3_request.add_response(len);
self.web3_request.set_response(len);
x
});

@ -443,15 +443,24 @@ impl Web3Rpcs {
let web3_request =
ValidatedRequest::new_internal(method, params, head_block, max_wait).await?;
let response = self.request_with_metadata(&web3_request).await?;
let response = self.request_with_metadata::<R>(&web3_request).await?;
// the response might support streaming. we need to parse it
let parsed = response.parsed().await?;
match parsed.payload {
jsonrpc::ResponsePayload::Success { result } => Ok(result),
jsonrpc::ResponsePayload::Success { result } => {
// todo: i don't love this length
web3_request.set_response(0);
Ok(result)
}
// TODO: confirm this error type is correct
jsonrpc::ResponsePayload::Error { error } => Err(error.into()),
jsonrpc::ResponsePayload::Error { error } => {
web3_request.set_error_response(&Web3ProxyError::JsonRpcErrorData(error.clone()));
Err(error.into())
}
}
}
@ -478,7 +487,11 @@ impl Web3Rpcs {
// TODO: i'd like to get rid of this clone
let rpc = active_request_handle.clone_connection();
web3_request.backend_requests.lock().push(rpc);
{
let mut response_lock = web3_request.response.lock();
response_lock.backend_rpcs.push(rpc);
}
match active_request_handle.request::<R>().await {
Ok(response) => {

@ -1307,9 +1307,9 @@ impl Web3Rpc {
Ok(x) => {
// TODO: this is not efficient :(
let x = json!(x);
web3_request.add_response(&x)
web3_request.set_response(&x)
}
Err(e) => web3_request.add_error_response(e),
Err(e) => web3_request.set_error_response(e),
}
response

@ -24,9 +24,7 @@ use migration::sea_orm::{
};
use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tracing::{error, instrument, trace, warn};
@ -38,8 +36,6 @@ pub enum StatType {
Detailed,
}
pub type BackendRequests = Mutex<Vec<Arc<Web3Rpc>>>;
#[derive(AddAssign, Copy, Clone, Debug, Default)]
pub struct FlushedStats {
/// the number of rows saved to the relational database.
@ -551,20 +547,23 @@ impl RpcQueryStats {
// TODO: do this without a clone
let authorization = metadata.authorization.clone();
let archive_request = metadata.archive_request.load(Ordering::SeqCst);
let request_bytes = metadata.inner.num_bytes() as u64;
let response_lock = metadata.response.lock();
let archive_request = response_lock.archive_request;
// TODO: do this without cloning. we can take their vec
let backend_rpcs_used = metadata.backend_rpcs_used();
let backend_rpcs_used = response_lock.backend_rpcs.clone();
let request_bytes = metadata.inner.num_bytes() as u64;
let response_bytes = metadata.response_bytes.load(Ordering::SeqCst);
let response_bytes = response_lock.response_bytes;
let mut error_response = metadata.error_response.load(Ordering::SeqCst);
let mut response_millis = metadata.response_millis.load(Ordering::SeqCst);
let mut error_response = response_lock.error_response;
let mut response_millis = response_lock.response_millis;
let user_error_response = metadata.user_error_response.load(Ordering::SeqCst);
let user_error_response = response_lock.user_error_response;
let response_timestamp = match metadata.response_timestamp.load(Ordering::SeqCst) {
let response_timestamp = match response_lock.response_timestamp {
0 => {
// no response timestamp!
if !error_response {

@ -4,6 +4,7 @@ use tracing::{error, info};
use web3_proxy::app::BILLING_PERIOD_SECONDS;
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{Authorization, RequestOrMethod};
use web3_proxy::jsonrpc::request_builder::ValidatedResponse;
use web3_proxy::jsonrpc::ValidatedRequest;
use web3_proxy::prelude::anyhow::{self, Context};
use web3_proxy::prelude::argh::{self, FromArgs};
@ -191,27 +192,29 @@ impl MigrateStatsToV2SubCommand {
let request = RequestOrMethod::Method(method, int_request_bytes as usize);
let web3_response = ValidatedResponse {
archive_request: x.archive_request,
backend_rpcs,
error_response: x.error_response,
// This is not relevant in the new version
no_servers: 0,
response_bytes: int_response_bytes,
response_timestamp: x.period_datetime.timestamp(),
response_millis: int_response_millis,
user_error_response: false,
};
// Create ValidatedRequest
let web3_request = ValidatedRequest {
archive_request: x.archive_request.into(),
response: Mutex::new(web3_response),
authorization: authorization.clone(),
backend_requests: Mutex::new(backend_rpcs),
chain_id,
error_response: x.error_response.into(),
head_block: None,
// debug data is in kafka, not mysql or influx
kafka_debug_logger: None,
inner: request,
// This is not relevant in the new version
no_servers: 0.into(),
response_bytes: int_response_bytes.into(),
// We did not initially record this data
response_from_backup_rpc: false.into(),
response_timestamp: x.period_datetime.timestamp().into(),
response_millis: int_response_millis.into(),
stat_sender: Some(stat_sender.clone()),
started_active_premium: false,
user_error_response: false.into(),
usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(),
cache_mode: Default::default(),
start_instant: Instant::now(),