more logs

This commit is contained in:
Bryan Stitt 2022-12-19 15:59:01 -08:00
parent 2a19d9791b
commit f27c764a07
7 changed files with 23 additions and 9 deletions

@ -1168,6 +1168,8 @@ impl Web3ProxyApp {
.context("stat_sender sending response stat")?; .context("stat_sender sending response stat")?;
} }
todo!("attach a header here");
Ok(response) Ok(response)
} }
} }

@ -275,7 +275,7 @@ impl ProxyResponseStat {
response_bytes: usize, response_bytes: usize,
) -> Self { ) -> Self {
let archive_request = metadata.archive_request.load(Ordering::Acquire); let archive_request = metadata.archive_request.load(Ordering::Acquire);
let backend_requests = metadata.backend_requests.load(Ordering::Acquire); let backend_requests = metadata.backend_requests.lock().len() as u64;
// let period_seconds = metadata.period_seconds; // let period_seconds = metadata.period_seconds;
// let period_timestamp = // let period_timestamp =
// (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; // (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds;

@ -10,6 +10,7 @@ use std::sync::Arc;
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
#[allow(non_snake_case)]
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 { pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 {
match block_num { match block_num {
BlockNumber::Earliest => { BlockNumber::Earliest => {

@ -2,6 +2,7 @@
use super::errors::FrontendErrorResponse; use super::errors::FrontendErrorResponse;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::rpcs::connection::Web3Connection;
use crate::user_token::UserBearerToken; use crate::user_token::UserBearerToken;
use anyhow::Context; use anyhow::Context;
use axum::headers::authorization::Bearer; use axum::headers::authorization::Bearer;
@ -14,6 +15,7 @@ use http::HeaderValue;
use ipnet::IpNet; use ipnet::IpNet;
use log::error; use log::error;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::Mutex;
use redis_rate_limiter::RedisRateLimitResult; use redis_rate_limiter::RedisRateLimitResult;
use std::fmt::Display; use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::atomic::{AtomicBool, AtomicU64};
@ -73,8 +75,8 @@ pub struct RequestMetadata {
// TODO: do we need atomics? seems like we should be able to pass a &mut around // TODO: do we need atomics? seems like we should be able to pass a &mut around
// TODO: "archive" isn't really a boolean. // TODO: "archive" isn't really a boolean.
pub archive_request: AtomicBool, pub archive_request: AtomicBool,
/// if this is 0, there was a cache_hit /// if this is empty, there was a cache_hit
pub backend_requests: AtomicU64, pub backend_requests: Mutex<Vec<Arc<Web3Connection>>>,
pub no_servers: AtomicU64, pub no_servers: AtomicU64,
pub error_response: AtomicBool, pub error_response: AtomicBool,
pub response_bytes: AtomicU64, pub response_bytes: AtomicU64,
@ -92,7 +94,7 @@ impl RequestMetadata {
period_seconds, period_seconds,
request_bytes, request_bytes,
archive_request: false.into(), archive_request: false.into(),
backend_requests: 0.into(), backend_requests: Default::default(),
no_servers: 0.into(), no_servers: 0.into(),
error_response: false.into(), error_response: false.into(),
response_bytes: 0.into(), response_bytes: 0.into(),

@ -1,9 +1,11 @@
use crate::rpcs::connection::Web3Connection;
use derive_more::From; use derive_more::From;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::fmt; use std::fmt;
use std::sync::Arc;
// this is used by serde // this is used by serde
#[allow(dead_code)] #[allow(dead_code)]

@ -607,9 +607,11 @@ impl Web3Connections {
skip_rpcs.push(active_request_handle.clone_connection()); skip_rpcs.push(active_request_handle.clone_connection());
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
// TODO: request_metadata.backend_requests instead of skip_rpcs
request_metadata request_metadata
.backend_requests .backend_requests
.fetch_add(1, Ordering::Acquire); .lock()
.push(active_request_handle.clone_connection());
} }
// TODO: get the log percent from the user data // TODO: get the log percent from the user data
@ -627,7 +629,7 @@ impl Web3Connections {
) { ) {
Ok(response) => { Ok(response) => {
if let Some(error) = &response.error { if let Some(error) = &response.error {
// // trace!(?response, "rpc error"); // trace!(?response, "rpc error");
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
request_metadata request_metadata
@ -654,7 +656,7 @@ impl Web3Connections {
} }
} }
} else { } else {
// // trace!(?response, "rpc success"); // trace!(?response, "rpc success");
} }
return Ok(response); return Ok(response);
@ -710,7 +712,7 @@ impl Web3Connections {
.store(true, Ordering::Release); .store(true, Ordering::Release);
} }
warn!("No synced servers! {:?}", self.synced_connections.load()); warn!("No synced servers! {:?}", self);
// TODO: what error code? 502? // TODO: what error code? 502?
Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len()))
@ -738,7 +740,8 @@ impl Web3Connections {
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
request_metadata request_metadata
.backend_requests .backend_requests
.fetch_add(active_request_handles.len() as u64, Ordering::Release); .lock()
.extend(active_request_handles.iter().map(|x| x.clone_connection()));
} }
let quorum_response = self let quorum_response = self

@ -195,6 +195,10 @@ impl OpenRequestHandle {
} }
} }
pub fn connection_name(&self) -> String {
self.conn.name.clone()
}
#[inline] #[inline]
pub fn clone_connection(&self) -> Arc<Web3Connection> { pub fn clone_connection(&self) -> Arc<Web3Connection> {
self.conn.clone() self.conn.clone()