From f27c764a070dbd107726342f6d69652733042d59 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 19 Dec 2022 15:59:01 -0800 Subject: [PATCH] more logs --- web3_proxy/src/app/mod.rs | 2 ++ web3_proxy/src/app_stats.rs | 2 +- web3_proxy/src/block_number.rs | 1 + web3_proxy/src/frontend/authorization.rs | 8 +++++--- web3_proxy/src/jsonrpc.rs | 2 ++ web3_proxy/src/rpcs/connections.rs | 13 ++++++++----- web3_proxy/src/rpcs/request.rs | 4 ++++ 7 files changed, 23 insertions(+), 9 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 388dd5d2..cc2e7c4e 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1168,6 +1168,8 @@ impl Web3ProxyApp { .context("stat_sender sending response stat")?; } + todo!("attach a header here"); + Ok(response) } } diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 5adddbd6..9028db07 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -275,7 +275,7 @@ impl ProxyResponseStat { response_bytes: usize, ) -> Self { let archive_request = metadata.archive_request.load(Ordering::Acquire); - let backend_requests = metadata.backend_requests.load(Ordering::Acquire); + let backend_requests = metadata.backend_requests.lock().len() as u64; // let period_seconds = metadata.period_seconds; // let period_timestamp = // (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index d2fa716d..eb0fda87 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; +#[allow(non_snake_case)] pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 { match block_num { BlockNumber::Earliest => { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index ddb9c987..e3efb332 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -2,6 +2,7 @@ use super::errors::FrontendErrorResponse; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; +use crate::rpcs::connection::Web3Connection; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; @@ -14,6 +15,7 @@ use http::HeaderValue; use ipnet::IpNet; use log::error; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use parking_lot::Mutex; use redis_rate_limiter::RedisRateLimitResult; use std::fmt::Display; use std::sync::atomic::{AtomicBool, AtomicU64}; @@ -73,8 +75,8 @@ pub struct RequestMetadata { // TODO: do we need atomics? seems like we should be able to pass a &mut around // TODO: "archive" isn't really a boolean. pub archive_request: AtomicBool, - /// if this is 0, there was a cache_hit - pub backend_requests: AtomicU64, + /// if this is empty, there was a cache_hit + pub backend_requests: Mutex>>, pub no_servers: AtomicU64, pub error_response: AtomicBool, pub response_bytes: AtomicU64, @@ -92,7 +94,7 @@ impl RequestMetadata { period_seconds, request_bytes, archive_request: false.into(), - backend_requests: 0.into(), + backend_requests: Default::default(), no_servers: 0.into(), error_response: false.into(), response_bytes: 0.into(), diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 6395a208..fc423efe 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,9 +1,11 @@ +use crate::rpcs::connection::Web3Connection; use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; +use std::sync::Arc; // this is used by serde #[allow(dead_code)] diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 346d5936..f33362d2 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -607,9 +607,11 @@ impl Web3Connections { skip_rpcs.push(active_request_handle.clone_connection()); if let Some(request_metadata) = request_metadata { + // TODO: request_metadata.backend_requests instead of skip_rpcs request_metadata .backend_requests - .fetch_add(1, Ordering::Acquire); + .lock() + .push(active_request_handle.clone_connection()); } // TODO: get the log percent from the user data @@ -627,7 +629,7 @@ impl Web3Connections { ) { Ok(response) => { if let Some(error) = &response.error { - // // trace!(?response, "rpc error"); + // trace!(?response, "rpc error"); if let Some(request_metadata) = request_metadata { request_metadata @@ -654,7 +656,7 @@ impl Web3Connections { } } } else { - // // trace!(?response, "rpc success"); + // trace!(?response, "rpc success"); } return Ok(response); @@ -710,7 +712,7 @@ impl Web3Connections { .store(true, Ordering::Release); } - warn!("No synced servers! {:?}", self.synced_connections.load()); + warn!("No synced servers! {:?}", self); // TODO: what error code? 502? Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) @@ -738,7 +740,8 @@ impl Web3Connections { if let Some(request_metadata) = request_metadata { request_metadata .backend_requests - .fetch_add(active_request_handles.len() as u64, Ordering::Release); + .lock() + .extend(active_request_handles.iter().map(|x| x.clone_connection())); } let quorum_response = self diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 1ba3d6be..7358982c 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -195,6 +195,10 @@ impl OpenRequestHandle { } } + pub fn connection_name(&self) -> String { + self.conn.name.clone() + } + #[inline] pub fn clone_connection(&self) -> Arc { self.conn.clone()