emit our first stats-- cache hits and misses

This commit is contained in:
Bryan Stitt 2022-10-03 20:02:05 +00:00
parent 25d34da98d
commit 875ae457ef
6 changed files with 120 additions and 41 deletions

View File

@ -193,6 +193,7 @@ These are roughly in order of completition
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
- [ ] display logged reverts on an endpoint that requires authentication - [ ] display logged reverts on an endpoint that requires authentication
- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection
- [ ] have a log all option? instead of just reverts, log all request/responses? can be very useful for debugging
## V1 ## V1

View File

@ -46,7 +46,7 @@ services:
dev-eth: dev-eth:
extends: extends:
file: docker-compose.common.yml file: docker-compose.common.yml
service: base service: web3-proxy
volumes: volumes:
- ./config/example.toml:/config.toml - ./config/example.toml:/config.toml
ports: ports:

View File

@ -11,8 +11,9 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId};
use crate::rpcs::connections::Web3Connections; use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use crate::stats::StatEmitter; use crate::stats::{ProxyResponseStat, ProxyResponseType, StatEmitter, Web3ProxyStat};
use anyhow::Context; use anyhow::Context;
use atomic::{AtomicBool, Ordering};
use axum::extract::ws::Message; use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent}; use axum::headers::{Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter; use deferred_rate_limiter::DeferredRateLimiter;
@ -112,6 +113,7 @@ pub struct Web3ProxyApp {
pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>, pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>, pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
} }
/// flatten a JoinError into an anyhow error /// flatten a JoinError into an anyhow error
@ -444,6 +446,7 @@ impl Web3ProxyApp {
user_key_cache, user_key_cache,
user_key_semaphores, user_key_semaphores,
ip_semaphores, ip_semaphores,
stat_sender,
}; };
let app = Arc::new(app); let app = Arc::new(app);
@ -972,34 +975,56 @@ impl Web3ProxyApp {
request.params.clone().map(|x| x.to_string()), request.params.clone().map(|x| x.to_string()),
); );
// TODO: remove their request id instead of cloning it let cache_hit = Arc::new(AtomicBool::new(true));
// TODO: move this caching outside this match and cache some of the other responses? let mut response = {
// TODO: cache the warp::reply to save us serializing every time? let cache_hit = cache_hit.clone();
let mut response = self
.response_cache
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
Some(authorized_request),
request,
Some(&request_block_id.num),
)
.await?;
// discard their id by replacing it with an empty self.response_cache
response.id = Default::default(); .try_get_with(cache_key, async move {
cache_hit.store(false, Ordering::Release);
Ok::<_, anyhow::Error>(response) // TODO: retry some failures automatically!
}) // TODO: try private_rpcs if all the balanced_rpcs fail!
.await // TODO: put the hash here instead?
// TODO: what is the best way to handle an Arc here? let mut response = self
.map_err(|err| anyhow::anyhow!(err)) .balanced_rpcs
.context("caching response")?; .try_send_best_upstream_server(
Some(authorized_request),
request,
Some(&request_block_id.num),
)
.await?;
// discard their id by replacing it with an empty
response.id = Default::default();
Ok::<_, anyhow::Error>(response)
})
.await
// TODO: what is the best way to handle an Arc here?
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!(err)
})
.context("caching response")?
};
if let Some(stat_sender) = &self.stat_sender {
let response_type = if cache_hit.load(Ordering::Acquire) {
ProxyResponseType::CacheHit
} else {
ProxyResponseType::CacheMiss
};
let response_stat = ProxyResponseStat::new(
method.to_string(),
response_type,
authorized_request,
);
stat_sender.send_async(response_stat.into()).await?;
}
// since this data came likely out of a cache, the id is not going to match // since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id. // replace the id with our request's id.

View File

@ -201,6 +201,16 @@ impl AuthorizedRequest {
} }
} }
impl Display for &AuthorizedRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthorizedRequest::Internal => f.write_str("internal"),
AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("user_key:{}", x.user_key_id)),
}
}
}
pub async fn login_is_authorized( pub async fn login_is_authorized(
app: &Web3ProxyApp, app: &Web3ProxyApp,
ip: IpAddr, ip: IpAddr,

View File

@ -473,13 +473,14 @@ impl Web3Connection {
trace!(rpc=%conn, "provider is ready"); trace!(rpc=%conn, "provider is ready");
} else { } else {
warn!(rpc=%conn, "provider is NOT ready"); warn!(rpc=%conn, "provider is NOT ready");
// returning error will trigger a reconnect
// TODO: what if we just happened to have this check line up with another restart?
return Err(anyhow::anyhow!("provider is not ready")); return Err(anyhow::anyhow!("provider is not ready"));
} }
} }
// TODO: how often? // TODO: how often?
// TODO: should we also check that the head block has changed recently? // TODO: also check that the head block has changed recently
// TODO: maybe instead we should do a simple subscription and follow that instead
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;
} }
}; };

View File

@ -1,26 +1,64 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use derive_more::From;
use influxdb::Client;
use influxdb::InfluxDbWriteable; use influxdb::InfluxDbWriteable;
use influxdb::{Client, Query, ReadQuery, Timestamp};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{error, info}; use tracing::{error, info, trace};
/// TODO: replace this example stat with our own use crate::frontend::authorization::AuthorizedRequest;
#[derive(InfluxDbWriteable)]
pub struct WeatherReading { #[derive(Debug)]
time: DateTime<Utc>, pub enum ProxyResponseType {
humidity: i32, CacheHit,
#[influxdb(tag)] CacheMiss,
wind_direction: String, Error,
} }
impl From<ProxyResponseType> for influxdb::Type {
fn from(x: ProxyResponseType) -> Self {
match x {
ProxyResponseType::CacheHit => "cache_hit".into(),
ProxyResponseType::CacheMiss => "cache_miss".into(),
ProxyResponseType::Error => "error".into(),
}
}
}
/// TODO: where should this be defined?
/// TODO: what should be fields and what should be tags. count is always 1 which feels wrong
#[derive(Debug, InfluxDbWriteable)]
pub struct ProxyResponseStat {
time: DateTime<Utc>,
count: u32,
#[influxdb(tag)]
method: String,
#[influxdb(tag)]
response_type: ProxyResponseType,
#[influxdb(tag)]
who: String,
}
impl ProxyResponseStat {
pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self {
Self {
time: Utc::now(),
count: 1,
method,
response_type,
who: who.to_string(),
}
}
}
#[derive(Debug, From)]
pub enum Web3ProxyStat { pub enum Web3ProxyStat {
WeatherReading(WeatherReading), ProxyResponse(ProxyResponseStat),
} }
impl Web3ProxyStat { impl Web3ProxyStat {
fn into_query(self) -> influxdb::WriteQuery { fn into_query(self) -> influxdb::WriteQuery {
match self { match self {
Self::WeatherReading(x) => x.into_query("weather"), Self::ProxyResponse(x) => x.into_query("proxy_response"),
} }
} }
} }
@ -46,7 +84,11 @@ impl StatEmitter {
let f = async move { let f = async move {
while let Ok(x) = rx.recv_async().await { while let Ok(x) = rx.recv_async().await {
if let Err(err) = client.query(x.into_query()).await { let x = x.into_query();
trace!(?x, "emitting stat");
if let Err(err) = client.query(x).await {
error!(?err, "failed writing stat"); error!(?err, "failed writing stat");
// TODO: now what? // TODO: now what?
} }