diff --git a/TODO.md b/TODO.md index 3ff9545c..77e23f19 100644 --- a/TODO.md +++ b/TODO.md @@ -193,6 +193,7 @@ These are roughly in order of completition - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens - [ ] display logged reverts on an endpoint that requires authentication - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection +- [ ] have a log all option? instead of just reverts, log all request/responses? can be very useful for debugging ## V1 diff --git a/docker-compose.yml b/docker-compose.yml index 36703fcb..b1576eba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,7 +46,7 @@ services: dev-eth: extends: file: docker-compose.common.yml - service: base + service: web3-proxy volumes: - ./config/example.toml:/config.toml ports: diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 76c804f7..ac79b2f8 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -11,8 +11,9 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; -use crate::stats::StatEmitter; +use crate::stats::{ProxyResponseStat, ProxyResponseType, StatEmitter, Web3ProxyStat}; use anyhow::Context; +use atomic::{AtomicBool, Ordering}; use axum::extract::ws::Message; use axum::headers::{Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimiter; @@ -112,6 +113,7 @@ pub struct Web3ProxyApp { pub user_key_cache: Cache, pub user_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub stat_sender: Option>, } /// flatten a JoinError into an anyhow error @@ -444,6 +446,7 @@ impl Web3ProxyApp { user_key_cache, user_key_semaphores, ip_semaphores, + stat_sender, }; let app = Arc::new(app); @@ -972,34 +975,56 @@ impl Web3ProxyApp { request.params.clone().map(|x| x.to_string()), ); - // TODO: remove their request id instead of cloning it + let cache_hit = Arc::new(AtomicBool::new(true)); - // TODO: move this caching outside this match and cache some of the other responses? - // TODO: cache the warp::reply to save us serializing every time? - let mut response = self - .response_cache - .try_get_with(cache_key, async move { - // TODO: retry some failures automatically! - // TODO: try private_rpcs if all the balanced_rpcs fail! - // TODO: put the hash here instead? - let mut response = self - .balanced_rpcs - .try_send_best_upstream_server( - Some(authorized_request), - request, - Some(&request_block_id.num), - ) - .await?; + let mut response = { + let cache_hit = cache_hit.clone(); - // discard their id by replacing it with an empty - response.id = Default::default(); + self.response_cache + .try_get_with(cache_key, async move { + cache_hit.store(false, Ordering::Release); - Ok::<_, anyhow::Error>(response) - }) - .await - // TODO: what is the best way to handle an Arc here? - .map_err(|err| anyhow::anyhow!(err)) - .context("caching response")?; + // TODO: retry some failures automatically! + // TODO: try private_rpcs if all the balanced_rpcs fail! + // TODO: put the hash here instead? + let mut response = self + .balanced_rpcs + .try_send_best_upstream_server( + Some(authorized_request), + request, + Some(&request_block_id.num), + ) + .await?; + + // discard their id by replacing it with an empty + response.id = Default::default(); + + Ok::<_, anyhow::Error>(response) + }) + .await + // TODO: what is the best way to handle an Arc here? + .map_err(|err| { + // TODO: emit a stat for an error + anyhow::anyhow!(err) + }) + .context("caching response")? + }; + + if let Some(stat_sender) = &self.stat_sender { + let response_type = if cache_hit.load(Ordering::Acquire) { + ProxyResponseType::CacheHit + } else { + ProxyResponseType::CacheMiss + }; + + let response_stat = ProxyResponseStat::new( + method.to_string(), + response_type, + authorized_request, + ); + + stat_sender.send_async(response_stat.into()).await?; + } // since this data came likely out of a cache, the id is not going to match // replace the id with our request's id. diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 1e3665b4..a012b78c 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -201,6 +201,16 @@ impl AuthorizedRequest { } } +impl Display for &AuthorizedRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AuthorizedRequest::Internal => f.write_str("internal"), + AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)), + AuthorizedRequest::User(_, x) => f.write_str(&format!("user_key:{}", x.user_key_id)), + } + } +} + pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 23f2307b..d32f8c84 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -473,13 +473,14 @@ impl Web3Connection { trace!(rpc=%conn, "provider is ready"); } else { warn!(rpc=%conn, "provider is NOT ready"); + // returning error will trigger a reconnect + // TODO: what if we just happened to have this check line up with another restart? return Err(anyhow::anyhow!("provider is not ready")); } } // TODO: how often? - // TODO: should we also check that the head block has changed recently? - // TODO: maybe instead we should do a simple subscription and follow that instead + // TODO: also check that the head block has changed recently sleep(Duration::from_secs(10)).await; } }; diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 6b39d30e..7b32eb34 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,26 +1,64 @@ use chrono::{DateTime, Utc}; +use derive_more::From; +use influxdb::Client; use influxdb::InfluxDbWriteable; -use influxdb::{Client, Query, ReadQuery, Timestamp}; use tokio::task::JoinHandle; -use tracing::{error, info}; +use tracing::{error, info, trace}; -/// TODO: replace this example stat with our own -#[derive(InfluxDbWriteable)] -pub struct WeatherReading { - time: DateTime, - humidity: i32, - #[influxdb(tag)] - wind_direction: String, +use crate::frontend::authorization::AuthorizedRequest; + +#[derive(Debug)] +pub enum ProxyResponseType { + CacheHit, + CacheMiss, + Error, } +impl From for influxdb::Type { + fn from(x: ProxyResponseType) -> Self { + match x { + ProxyResponseType::CacheHit => "cache_hit".into(), + ProxyResponseType::CacheMiss => "cache_miss".into(), + ProxyResponseType::Error => "error".into(), + } + } +} + +/// TODO: where should this be defined? +/// TODO: what should be fields and what should be tags. count is always 1 which feels wrong +#[derive(Debug, InfluxDbWriteable)] +pub struct ProxyResponseStat { + time: DateTime, + count: u32, + #[influxdb(tag)] + method: String, + #[influxdb(tag)] + response_type: ProxyResponseType, + #[influxdb(tag)] + who: String, +} + +impl ProxyResponseStat { + pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self { + Self { + time: Utc::now(), + count: 1, + method, + response_type, + who: who.to_string(), + } + } +} + +#[derive(Debug, From)] pub enum Web3ProxyStat { - WeatherReading(WeatherReading), + ProxyResponse(ProxyResponseStat), } impl Web3ProxyStat { fn into_query(self) -> influxdb::WriteQuery { match self { - Self::WeatherReading(x) => x.into_query("weather"), + Self::ProxyResponse(x) => x.into_query("proxy_response"), } } } @@ -46,7 +84,11 @@ impl StatEmitter { let f = async move { while let Ok(x) = rx.recv_async().await { - if let Err(err) = client.query(x.into_query()).await { + let x = x.into_query(); + + trace!(?x, "emitting stat"); + + if let Err(err) = client.query(x).await { error!(?err, "failed writing stat"); // TODO: now what? }