better metrics and spawn

This commit is contained in:
Bryan Stitt 2022-09-08 21:01:36 +00:00
parent cfca16319b
commit cae034afb3
17 changed files with 408 additions and 273 deletions

239
Cargo.lock generated

@ -142,6 +142,28 @@ dependencies = [
"term",
]
[[package]]
name = "aspect"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3927b415bba088539aaaf872d19752c7d00101a25ead1d123fcd7633f9c224d"
dependencies = [
"aspect-weave",
]
[[package]]
name = "aspect-weave"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea4f672ac5290272725e1453014af99a86d2c1712808d647f469bf9427519f41"
dependencies = [
"indexmap",
"proc-macro2",
"quote",
"syn",
"synattra",
]
[[package]]
name = "async-attributes"
version = "1.1.2"
@ -307,6 +329,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "atomic"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c"
dependencies = [
"autocfg 1.1.0",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
@ -324,6 +355,40 @@ dependencies = [
"winapi",
]
[[package]]
name = "auto_enums"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0dfe45d75158751e195799f47ea02e81f570aa24bc5ef999cdd9e888c4b5c3"
dependencies = [
"auto_enums_core",
"auto_enums_derive",
]
[[package]]
name = "auto_enums_core"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da47c46001293a2c4b744d731958be22cff408a2ab76e2279328f9713b1267b4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "auto_enums_derive"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd"
dependencies = [
"autocfg 1.1.0",
"derive_utils",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "auto_impl"
version = "1.0.1"
@ -1213,6 +1278,17 @@ dependencies = [
"syn",
]
[[package]]
name = "derive_utils"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "532b4c15dccee12c7044f1fcad956e98410860b22231e44a3b827464797ca7bf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "dialoguer"
version = "0.8.0"
@ -1281,6 +1357,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "dotenv"
version = "0.15.0"
@ -1293,12 +1375,6 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0"
[[package]]
name = "dtoa"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
[[package]]
name = "dunce"
version = "1.0.2"
@ -1823,11 +1899,10 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.0.1"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
dependencies = [
"matches",
"percent-encoding",
]
@ -1860,6 +1935,12 @@ dependencies = [
"libc",
]
[[package]]
name = "ftoa"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35"
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -2142,6 +2223,20 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "hdrhistogram"
version = "7.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea9fe3952d32674a14e0975009a3547af9ea364995b5ec1add2e23c2ae523ab"
dependencies = [
"base64 0.13.0",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.7"
@ -2310,11 +2405,10 @@ dependencies = [
[[package]]
name = "idna"
version = "0.2.3"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]
@ -2371,6 +2465,7 @@ checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg 1.1.0",
"hashbrown",
"serde",
]
[[package]]
@ -2601,12 +2696,6 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matchit"
version = "0.5.0"
@ -2637,6 +2726,36 @@ dependencies = [
"autocfg 1.1.0",
]
[[package]]
name = "metered"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17491527d2ceff20d00d02166bdd18e23056e7ced22b9a8bb0efdfd293f0441a"
dependencies = [
"aspect",
"atomic",
"cfg-if",
"hdrhistogram",
"metered-macro",
"parking_lot 0.12.1",
"serde",
]
[[package]]
name = "metered-macro"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5ef9d33baa693e2d449d069f6ef6eb549762ed0c0179976c45bd98f3aa4a4e1"
dependencies = [
"aspect-weave",
"heck 0.4.0",
"indexmap",
"proc-macro2",
"quote",
"syn",
"synattra",
]
[[package]]
name = "migration"
version = "0.1.0"
@ -3079,9 +3198,9 @@ dependencies = [
[[package]]
name = "percent-encoding"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
@ -3399,29 +3518,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "prometheus-client"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c473049631c233933d6286c88bbb7be30e62ec534cf99a9ae0079211f7fa603"
dependencies = [
"dtoa 1.0.3",
"itoa 1.0.2",
"parking_lot 0.12.1",
"prometheus-client-derive-text-encode",
]
[[package]]
name = "prometheus-client-derive-text-encode"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a455fbcb954c1a7decf3c586e860fd7889cddf4b8e164be736dbac95a953cd"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.2"
@ -3633,7 +3729,7 @@ dependencies = [
"async-trait",
"bytes",
"combine",
"dtoa 0.4.8",
"dtoa",
"futures-util",
"itoa 0.4.8",
"percent-encoding",
@ -4177,6 +4273,21 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_prometheus"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25fcd6131bac47a32328d1ba1ee15a27f8d91ab2e5920dba71dbe93d2648f6b1"
dependencies = [
"ftoa",
"indexmap",
"itoa 0.4.8",
"lazy_static",
"regex",
"serde",
"snafu",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -4349,6 +4460,27 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "snafu"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7"
dependencies = [
"doc-comment",
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "socket2"
version = "0.4.4"
@ -4624,6 +4756,17 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synattra"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "378cd5695f9ef5a26668bb70e81a464e7de6144bac3f77f42d5fa596c690be63"
dependencies = [
"auto_enums",
"proc-macro2",
"syn",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
@ -5167,13 +5310,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.2"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]
@ -5378,6 +5520,7 @@ dependencies = [
"handlebars",
"hashbrown",
"http",
"metered",
"migration",
"moka",
"notify",
@ -5385,7 +5528,6 @@ dependencies = [
"parking_lot 0.12.1",
"petgraph",
"proctitle",
"prometheus-client",
"rand 0.8.5",
"redis-rate-limit",
"regex",
@ -5394,6 +5536,7 @@ dependencies = [
"sea-orm",
"serde",
"serde_json",
"serde_prometheus",
"siwe",
"time 0.3.14",
"tokio",

@ -34,13 +34,13 @@ flume = "0.10.14"
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.4", default-features = false, features = ["future"] }
notify = "5.0.0"
num = "0.4.0"
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
petgraph = "0.6.2"
proctitle = "0.1.1"
prometheus-client = "0.18.0"
rand = "0.8.5"
# TODO: regex has several "perf" features that we might want to use
regex = "1.6.0"
@ -51,6 +51,7 @@ siwe = "0.4.2"
sea-orm = { version = "0.9.2", features = ["macros"] }
serde = { version = "1.0.144", features = [] }
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.14"
tokio = { version = "1.21.0", features = ["full", "tracing"] }
@ -64,5 +65,5 @@ tracing = "0.1.36"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] }
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.2.2"
url = "2.3.1"
uuid = "1.1.2"

@ -0,0 +1,31 @@
use metered::{metered, HitCount, Throughput};
use serde::Serialize;
#[derive(Default, Debug, Serialize)]
pub struct Biz {
metrics: BizMetrics,
}
#[metered(registry = BizMetrics)]
impl Biz {
#[measure([HitCount, Throughput])]
pub fn biz(&self) {
let delay = std::time::Duration::from_millis(rand::random::<u64>() % 200);
std::thread::sleep(delay);
}
}
fn main() {
let buz = Biz::default();
for _ in 0..100 {
buz.biz();
}
let mut globals = std::collections::HashMap::new();
globals.insert("service", "web3_proxy_prometheus_example");
let serialized = serde_prometheus::to_string(&buz.metrics, Some("example"), globals).unwrap();
println!("{}", serialized);
}

@ -9,7 +9,6 @@ use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
use axum::extract::ws::Message;
use derive_more::From;
@ -20,6 +19,8 @@ use futures::future::{join_all, AbortHandle};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::Future;
use hashbrown::HashMap;
use metered::{metered, ErrorCount, HitCount, InFlight, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait};
use moka::future::Cache;
use redis_rate_limit::bb8::PooledConnection;
@ -79,13 +80,14 @@ pub struct Web3ProxyApp {
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
/// prometheus metrics
metrics: Arc<Web3ProxyAppMetrics>,
/// store pending queries so that we don't send the same request to our backends multiple times
pub total_queries: AtomicU64,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>,
pub frontend_rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
pub user_cache: Cache<Uuid, UserCacheValue>,
}
@ -115,6 +117,7 @@ pub async fn flatten_handles<T>(
}
/// Connect to the database and run migrations
#[instrument(skip_all)]
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
@ -141,9 +144,9 @@ pub async fn get_migrated_db(
Ok(db)
}
#[metered(registry = Web3ProxyAppMetrics)]
impl Web3ProxyApp {
pub async fn spawn(
app_stats: AppStats,
top_config: TopConfig,
num_workers: u32,
) -> anyhow::Result<(
@ -321,7 +324,7 @@ impl Web3ProxyApp {
frontend_rate_limiter,
db_conn,
redis_pool,
stats: app_stats,
metrics: Default::default(),
user_cache,
};
@ -332,10 +335,22 @@ impl Web3ProxyApp {
Ok((app, handle))
}
pub async fn eth_subscribe(
self: Arc<Self>,
pub fn prometheus_metrics(&self) -> anyhow::Result<String> {
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
let serialized = serde_prometheus::to_string(&self.metrics, Some("web3_proxy"), globals)?;
Ok(serialized)
}
#[instrument(skip_all)]
#[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
payload: JsonRpcRequest,
subscription_count: &AtomicUsize,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
@ -524,7 +539,7 @@ impl Web3ProxyApp {
/// send the request or batch of requests to the approriate RPCs
#[instrument(skip_all)]
pub async fn proxy_web3_rpc(
&self,
self: &Arc<Self>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level
@ -550,14 +565,14 @@ impl Web3ProxyApp {
Ok(response)
}
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
#[instrument(skip_all)]
async fn proxy_web3_rpc_requests(
&self,
self: &Arc<Self>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem.
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
// TODO: is spawning here actually slower?
let num_requests = requests.len();
let responses = join_all(
requests
@ -576,6 +591,7 @@ impl Web3ProxyApp {
Ok(collected)
}
#[instrument(skip_all)]
pub async fn redis_conn(&self) -> anyhow::Result<PooledConnection<RedisConnectionManager>> {
match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
@ -587,8 +603,10 @@ impl Web3ProxyApp {
}
}
#[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
#[instrument(skip_all)]
async fn proxy_web3_rpc_request(
&self,
self: &Arc<Self>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
@ -609,7 +627,7 @@ impl Web3ProxyApp {
// TODO: don't clone
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
// lots of commands are blocked
"admin_addPeer"
method @ ("admin_addPeer"
| "admin_datadir"
| "admin_startRPC"
| "admin_startWS"
@ -671,20 +689,20 @@ impl Web3ProxyApp {
| "shh_newIdentity"
| "shh_post"
| "shh_uninstallFilter"
| "shh_version" => {
| "shh_version") => {
// TODO: client error stat
// TODO: proper error code
return Err(anyhow::anyhow!("unsupported"));
return Err(anyhow::anyhow!("method unsupported: {}", method));
}
// TODO: implement these commands
"eth_getFilterChanges"
method @ ("eth_getFilterChanges"
| "eth_getFilterLogs"
| "eth_newBlockFilter"
| "eth_newFilter"
| "eth_newPendingTransactionFilter"
| "eth_uninstallFilter" => {
| "eth_uninstallFilter") => {
// TODO: unsupported command stat
return Err(anyhow::anyhow!("not yet implemented"));
return Err(anyhow::anyhow!("not yet implemented: {}", method));
}
// some commands can use local data or caches
"eth_accounts" => {
@ -698,7 +716,9 @@ impl Web3ProxyApp {
}
None => {
// TODO: what does geth do if this happens?
return Err(anyhow::anyhow!("no servers synced"));
return Err(anyhow::anyhow!(
"no servers synced. unknown eth_blockNumber"
));
}
}
}

@ -0,0 +1,7 @@
fn main() -> anyhow::Result<()> {
// get curve-api and rpc endpoints from cli flags
// do simple checks for proxy version, blockNum, a simple balance call
// in parallel check all the curve api endpoints that we expect to pass. many require multiple chains that we don't yet run locally
Ok(())
}

@ -18,8 +18,7 @@ use tracing::{debug, info};
use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, Web3ProxyApp};
use web3_proxy::config::{CliConfig, TopConfig};
use web3_proxy::frontend;
use web3_proxy::stats::AppStatsRegistry;
use web3_proxy::{frontend, metrics};
fn run(
shutdown_receiver: flume::Receiver<()>,
@ -69,18 +68,14 @@ fn run(
debug!(?num_workers);
rt.block_on(async {
let app_stats_registry = AppStatsRegistry::new();
let app_stats = app_stats_registry.stats.clone();
let app_frontend_port = cli_config.port;
let app_prometheus_port = cli_config.prometheus_port;
let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config, num_workers).await?;
let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?;
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app));
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone()));
let prometheus_handle = tokio::spawn(app_stats_registry.serve(app_prometheus_port));
let prometheus_handle = tokio::spawn(metrics::serve(app, app_prometheus_port));
// if everything is working, these should both run forever
// TODO: try_join these instead? use signal_shutdown here?

@ -8,6 +8,7 @@ use hashbrown::HashMap;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::instrument;
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
@ -113,6 +114,7 @@ impl Web3ConnectionConfig {
/// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate))
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn spawn(
self,
name: String,

@ -9,6 +9,7 @@ use redis_rate_limit::{bb8::RunError, RedisError};
use sea_orm::DbErr;
use serde_json::value::RawValue;
use std::error::Error;
use tracing::instrument;
// TODO: take "IntoResult" instead?
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
@ -51,6 +52,7 @@ impl IntoResponse for FrontendErrorResponse {
}
}
#[instrument(skip_all)]
pub async fn handler_404() -> Response {
let err = anyhow::anyhow!("nothing to see here");

@ -3,8 +3,10 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use moka::future::ConcurrentCacheExt;
use serde_json::json;
use std::sync::{atomic, Arc};
use tracing::instrument;
/// Health check page for load balancers to use
#[instrument(skip_all)]
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
if app.balanced_rpcs.synced() {
(StatusCode::OK, "OK")
@ -15,20 +17,21 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// Very basic status page
/// TODO: replace this with proper stats and monitoring
#[instrument(skip_all)]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: what else should we include? uptime?
app.pending_transactions.sync();
app.user_cache.sync();
let body = json!({
"total_queries": app.total_queries.load(atomic::Ordering::Acquire),
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.user_cache.entry_count(),
"user_cache_size": app.user_cache.weighted_size(),
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
});
Json(body)
// // TODO: what else should we include? uptime?
// app.pending_transactions.sync();
// app.user_cache.sync();
// let body = json!({
// "total_queries": app.total_queries.load(atomic::Ordering::Acquire),
// "pending_transactions_count": app.pending_transactions.entry_count(),
// "pending_transactions_size": app.pending_transactions.weighted_size(),
// "user_cache_count": app.user_cache.entry_count(),
// "user_cache_size": app.user_cache.weighted_size(),
// "balanced_rpcs": app.balanced_rpcs,
// "private_rpcs": app.private_rpcs,
// });
// Json(body)
// TODO: only expose this on a different port
app.prometheus_metrics().unwrap()
}

@ -137,7 +137,7 @@ impl Web3ProxyApp {
}
Ok(ThrottleResult::RetryNever) => {
// TODO: prettier error for the user
return Err(anyhow::anyhow!("ip blocked by rate limiter"));
return Err(anyhow::anyhow!("ip ({}) blocked by rate limiter", ip));
}
Err(err) => {
// internal error, not rate limit being hit
@ -259,7 +259,10 @@ impl Web3ProxyApp {
}
Ok(ThrottleResult::RetryNever) => {
// TODO: prettier error for the user
return Err(anyhow::anyhow!("user blocked by rate limiter"));
return Err(anyhow::anyhow!(
"user #{} blocked by rate limiter",
user_data.user_id
));
}
Err(err) => {
// internal error, not rate limit being hit

@ -1,78 +1,67 @@
use super::errors::FrontendResult;
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key};
use crate::stats::Protocol;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
use axum::extract::{Host, Path};
use axum::headers::{Referer, UserAgent};
use axum::TypedHeader;
use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use std::sync::Arc;
use tracing::{error_span, Instrument};
use tracing::{debug_span, error_span, Instrument};
use uuid::Uuid;
pub async fn public_proxy_web3_rpc(
Json(payload): Json<JsonRpcRequestEnum>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
Host(host): Host,
ClientIp(ip): ClientIp,
Json(payload): Json<JsonRpcRequestEnum>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
) -> FrontendResult {
let _ip = rate_limit_by_ip(&app, ip).await?;
let request_span = debug_span!("request", host, ?referer, ?user_agent);
let protocol = Protocol::HTTP;
let user_id = 0;
let ip = rate_limit_by_ip(&app, ip)
.instrument(request_span.clone())
.await?;
let user_span = error_span!("user", user_id, ?protocol);
let user_span = error_span!("ip", %ip);
/*
// TODO: move this to a helper function (or two). have it fetch method, protocol, etc. from tracing?
match &payload {
JsonRpcRequestEnum::Batch(batch) => {
// TODO: use inc_by if possible? need to group them by rpc_method
for single in batch {
let rpc_method = single.method.clone();
let f = tokio::spawn(async move {
app.proxy_web3_rpc(payload)
.instrument(request_span)
.instrument(user_span)
.await
});
let _count = app
.stats
.proxy_requests
.get_or_create(&ProxyRequestLabels {
rpc_method,
protocol: protocol.clone(),
user_id,
})
.inc();
}
}
JsonRpcRequestEnum::Single(single) => {
let rpc_method = single.method.clone();
let _count = app
.stats
.proxy_requests
.get_or_create(&ProxyRequestLabels {
protocol,
rpc_method,
user_id,
})
.inc();
}
};
*/
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
let response = f.await.unwrap()?;
Ok(Json(&response).into_response())
}
pub async fn user_proxy_web3_rpc(
Json(payload): Json<JsonRpcRequestEnum>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
Host(host): Host,
Json(payload): Json<JsonRpcRequestEnum>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(user_key): Path<Uuid>,
referer: Option<TypedHeader<Referer>>,
) -> FrontendResult {
let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?;
let request_span = debug_span!("request", host, ?referer, ?user_agent);
let protocol = Protocol::HTTP;
let user_id: u64 = rate_limit_by_user_key(&app, user_key)
.instrument(request_span.clone())
.await?;
let user_span = error_span!("user", user_id, ?protocol);
let user_span = error_span!("user", user_id);
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
let f = tokio::spawn(async move {
app.proxy_web3_rpc(payload)
.instrument(request_span)
.instrument(user_span)
.await
});
let response = f.await.unwrap()?;
Ok(Json(&response).into_response())
}

@ -24,7 +24,6 @@ use uuid::Uuid;
use crate::{
app::Web3ProxyApp,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
stats::Protocol,
};
#[debug_handler]
@ -36,9 +35,8 @@ pub async fn public_websocket_handler(
let _ip = rate_limit_by_ip(&app, ip).await?;
let user_id = 0;
let protocol = Protocol::Websocket;
let user_span = error_span!("user", user_id, ?protocol);
let user_span = error_span!("user", user_id);
match ws_upgrade {
Some(ws) => Ok(ws
@ -59,11 +57,9 @@ pub async fn user_websocket_handler(
) -> FrontendResult {
let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?;
let protocol = Protocol::Websocket;
// log the id, not the address. we don't want to expose the user's address
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses
let user_span = error_span!("user", user_id, ?protocol);
let user_span = error_span!("user", user_id);
match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade
@ -118,7 +114,6 @@ async fn handle_socket_payload(
let span = error_span!("eth_subscribe");
let response = app
.clone()
.eth_subscribe(payload, subscription_count, response_sender.clone())
.instrument(span)
.await;

@ -3,6 +3,6 @@ pub mod block_number;
pub mod config;
pub mod frontend;
pub mod jsonrpc;
pub mod metrics;
pub mod rpcs;
pub mod stats;
pub mod users;

58
web3_proxy/src/metrics.rs Normal file

@ -0,0 +1,58 @@
use axum::headers::HeaderName;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{info, instrument};
use crate::app::Web3ProxyApp;
/// Run a prometheus metrics server on the given port.
#[instrument(skip_all)]
pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
// build our application with a route
// order most to least common
// TODO: 404 any unhandled routes?
let app = Router::new().route("/", get(root)).layer(Extension(app));
// run our app with hyper
// TODO: allow only listening on localhost?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("prometheus listening on port {}", port);
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
/*
It sequentially looks for an IP in:
- x-forwarded-for header (de-facto standard)
- x-real-ip header
- forwarded header (new standard)
- axum::extract::ConnectInfo (if not behind proxy)
So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt
*/
let service = app.into_make_service_with_connect_info::<SocketAddr>();
// let service = app.into_make_service();
// `axum::Server` is a re-export of `hyper::Server`
axum::Server::bind(&addr)
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(service)
.await
.map_err(Into::into)
}
#[instrument(skip_all)]
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics().unwrap();
let mut r = serialized.into_response();
// // TODO: is there an easier way to do this?
r.headers_mut().insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"),
);
r
}

@ -14,7 +14,7 @@ use serde::Serialize;
use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -401,7 +401,7 @@ impl Web3Connections {
} else {
// TODO: this is too verbose. move to trace
// i think "conns" is somehow getting dupes
debug!(?heavy_rpcs);
trace!(?heavy_rpcs);
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs

@ -1,10 +1,10 @@
use super::connection::Web3Connection;
use super::provider::Web3Provider;
// use metered::{measure, ErrorCount, HitCount, InFlight, ResponseTime, Throughput};
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use tokio::time::{sleep, Duration, Instant};
use tracing::debug;
use tracing::warn;
use tracing::{instrument, trace};
@ -45,6 +45,7 @@ impl OpenRequestHandle {
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
// #[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
#[instrument(skip_all)]
pub async fn request<T, R>(
&self,
@ -58,7 +59,7 @@ impl OpenRequestHandle {
// TODO: use tracing spans properly
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
debug!("Sending {} to {}", method, self.0);
trace!(rpc=%self.0, %method, "request");
let mut provider = None;
@ -81,8 +82,8 @@ impl OpenRequestHandle {
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information
trace!("Reply from {} for {}: {:?}", self.0, method, response);
// trace!("Reply from {}", self.0);
// trace!(rpc=%self.0, %method, ?response);
trace!(rpc=%self.0, %method, "response");
response
}

@ -1,115 +0,0 @@
use axum::headers::HeaderName;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router};
use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::info;
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
pub struct ProxyRequestLabels {
/// GET is websocket, POST is http
pub protocol: Protocol,
/// jsonrpc 2.0 method
pub rpc_method: String,
/// anonymous is user 0
pub user_id: u64,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, Encode)]
pub enum Protocol {
HTTP,
Websocket,
}
pub struct AppStatsRegistry {
pub registry: Registry,
pub stats: AppStats,
}
#[derive(Clone)]
pub struct AppStats {
pub proxy_requests: Family<ProxyRequestLabels, Counter>,
}
impl AppStatsRegistry {
pub fn new() -> Arc<Self> {
// Note the angle brackets to make sure to use the default (dynamic
// dispatched boxed metric) for the generic type parameter.
let mut registry = <Registry>::default();
// stats for GET and POST
let proxy_requests = Family::<ProxyRequestLabels, Counter>::default();
registry.register(
// With the metric name.
"http_requests",
// And the metric help text.
"Number of HTTP requests received",
Box::new(proxy_requests.clone()),
);
let new = Self {
registry,
stats: AppStats { proxy_requests },
};
Arc::new(new)
}
pub async fn serve(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
// build our application with a route
// order most to least common
// TODO: 404 any unhandled routes?
let app = Router::new()
.route("/", get(root))
.layer(Extension(self.clone()));
// run our app with hyper
// TODO: allow only listening on localhost?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("prometheus listening on port {}", port);
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
/*
It sequentially looks for an IP in:
- x-forwarded-for header (de-facto standard)
- x-real-ip header
- forwarded header (new standard)
- axum::extract::ConnectInfo (if not behind proxy)
So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt
*/
let service = app.into_make_service_with_connect_info::<SocketAddr>();
// let service = app.into_make_service();
// `axum::Server` is a re-export of `hyper::Server`
axum::Server::bind(&addr)
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(service)
.await
.map_err(Into::into)
}
}
async fn root(Extension(stats_registry): Extension<Arc<AppStatsRegistry>>) -> Response {
let mut buffer = vec![];
encode(&mut buffer, &stats_registry.registry).unwrap();
let s = String::from_utf8(buffer).unwrap();
let mut r = s.into_response();
// // TODO: is there an easier way to do this?
r.headers_mut().insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/openmetrics-text; version=1.0.0; charset=utf-8"),
);
r
}