From 2d5e7f263d5a6fc8d8d3a7ee98f64da0b288c092 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 12 Aug 2022 21:00:26 +0000 Subject: [PATCH] serve prometheus stats and use unsigned ints for ids --- TODO.md | 2 +- docker-compose.yml | 3 ++- entities/src/block_list.rs | 2 +- entities/src/secondary_user.rs | 4 ++-- entities/src/user.rs | 2 +- entities/src/user_keys.rs | 4 ++-- .../src/m20220101_000001_create_table.rs | 10 ++++---- web3_proxy/src/app.rs | 22 ++++++++++-------- web3_proxy/src/bin/web3_proxy.rs | 23 +++++++++++++++++-- web3_proxy/src/config.rs | 12 ++++++---- web3_proxy/src/connections.rs | 3 --- web3_proxy/src/frontend/mod.rs | 4 ++-- web3_proxy/src/frontend/rate_limit.rs | 4 ++-- web3_proxy/src/lib.rs | 1 + 14 files changed, 61 insertions(+), 35 deletions(-) diff --git a/TODO.md b/TODO.md index da2ad813..906e571e 100644 --- a/TODO.md +++ b/TODO.md @@ -72,7 +72,7 @@ - [x] HTTP GET to the websocket endpoints should redirect instead of giving an ugly error - [x] load the redirected page from config - [x] prettier output for create_user command. need the key in hex -- [ ] basic request method stats +- [-] basic request method stats - [ ] use siwe messages and signatures for sign up and login - [ ] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60 - [ ] i think the server isn't following the spec. we need a context attached to this error so we know which one diff --git a/docker-compose.yml b/docker-compose.yml index ff9f889d..758928e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,4 +33,5 @@ services: volumes: - ./config/example.toml:/config.toml ports: - - 8544:8544 + - 8544:8544 # proxy (should be behind something handling HTTPS) + - 8543:8543 # prometheus diff --git a/entities/src/block_list.rs b/entities/src/block_list.rs index 32fa5cd1..81047dbb 100644 --- a/entities/src/block_list.rs +++ b/entities/src/block_list.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; #[sea_orm(table_name = "block_list")] pub struct Model { #[sea_orm(primary_key)] - pub id: i64, + pub id: u64, #[sea_orm(unique)] pub address: Vec, pub description: Option, diff --git a/entities/src/secondary_user.rs b/entities/src/secondary_user.rs index cb4f3f8f..3c989edb 100644 --- a/entities/src/secondary_user.rs +++ b/entities/src/secondary_user.rs @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize}; #[sea_orm(table_name = "secondary_user")] pub struct Model { #[sea_orm(primary_key)] - pub id: i64, - pub user_id: i64, + pub id: u64, + pub user_id: u64, pub address: Vec, pub description: Option, pub email: Option, diff --git a/entities/src/user.rs b/entities/src/user.rs index fa6cc96c..6506455c 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; #[sea_orm(table_name = "user")] pub struct Model { #[sea_orm(primary_key)] - pub id: i64, + pub id: u64, #[sea_orm(unique)] pub address: Vec, pub description: Option, diff --git a/entities/src/user_keys.rs b/entities/src/user_keys.rs index 22683865..7456a3da 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/user_keys.rs @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize}; #[sea_orm(table_name = "user_keys")] pub struct Model { #[sea_orm(primary_key)] - pub id: i64, - pub user_id: i64, + pub id: u64, + pub user_id: u64, #[sea_orm(unique)] pub api_key: Uuid, pub description: Option, diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index 2f488930..364446a1 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -13,7 +13,7 @@ impl MigrationTrait for Migration { .table(User::Table) .col( ColumnDef::new(User::Id) - .big_integer() + .big_unsigned() .not_null() .auto_increment() .primary_key(), @@ -37,14 +37,14 @@ impl MigrationTrait for Migration { .table(SecondaryUser::Table) .col( ColumnDef::new(SecondaryUser::Id) - .big_integer() + .big_unsigned() .not_null() .auto_increment() .primary_key(), ) .col( ColumnDef::new(SecondaryUser::UserId) - .big_integer() + .big_unsigned() .not_null(), ) .col( @@ -76,12 +76,12 @@ impl MigrationTrait for Migration { .table(UserKeys::Table) .col( ColumnDef::new(UserKeys::Id) - .big_integer() + .big_unsigned() .not_null() .auto_increment() .primary_key(), ) - .col(ColumnDef::new(UserKeys::UserId).big_integer().not_null()) + .col(ColumnDef::new(UserKeys::UserId).big_unsigned().not_null()) .col( ColumnDef::new(UserKeys::ApiKey) .uuid() diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 785171cf..c7bf9cfb 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -40,6 +40,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; +use crate::stats::AppStats; // TODO: make this customizable? static APP_USER_AGENT: &str = concat!( @@ -69,7 +70,7 @@ pub enum TxState { #[derive(Clone, Copy, From)] pub struct UserCacheValue { pub expires_at: Instant, - pub user_id: i64, + pub user_id: u64, pub user_count_per_period: u32, } @@ -140,11 +141,12 @@ pub struct Web3ProxyApp { head_block_receiver: watch::Receiver>>, pending_tx_sender: broadcast::Sender, pub config: AppConfig, - pub pending_transactions: Arc>, - pub user_cache: RwLock>, - pub redis_pool: Option, - pub rate_limiter: Option, pub db_conn: Option, + pub pending_transactions: Arc>, + pub rate_limiter: Option, + pub redis_pool: Option, + pub stats: AppStats, + pub user_cache: RwLock>, } impl fmt::Debug for Web3ProxyApp { @@ -157,8 +159,9 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { // TODO: should we just take the rpc config as the only arg instead? pub async fn spawn( + app_stats: AppStats, top_config: TopConfig, - num_workers: usize, + workers: usize, ) -> anyhow::Result<( Arc, Pin>>>, @@ -171,7 +174,7 @@ impl Web3ProxyApp { // first, we connect to mysql and make sure the latest migrations have run let db_conn = if let Some(db_url) = &top_config.app.db_url { - let max_connections = num_workers.try_into()?; + let max_connections = workers.try_into()?; let db = get_migrated_db(db_url.clone(), max_connections).await?; @@ -209,7 +212,7 @@ impl Web3ProxyApp { let manager = RedisConnectionManager::new(redis_url.as_ref())?; - let min_size = num_workers as u32; + let min_size = workers as u32; let max_size = min_size * 4; // TODO: min_idle? // TODO: set max_size based on max expected concurrent connections? set based on num_workers? @@ -304,7 +307,7 @@ impl Web3ProxyApp { balanced_rpcs, private_rpcs, active_requests: Default::default(), - // TODO: make the share configurable + // TODO: make the share configurable. or maybe take a number as bytes? response_cache: RwLock::new(FifoSizedMap::new(response_cache_max_bytes, 100)), head_block_receiver, pending_tx_sender, @@ -312,6 +315,7 @@ impl Web3ProxyApp { rate_limiter: frontend_rate_limiter, db_conn, redis_pool, + stats: app_stats, // TODO: make the size configurable // TODO: why does this need to be async but the other one doesn't? user_cache: RwLock::new(FifoCountMap::new(1_000)), diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index eac61bff..09758b04 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -19,6 +19,7 @@ use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, Web3ProxyApp}; use web3_proxy::config::{CliConfig, TopConfig}; use web3_proxy::frontend; +use web3_proxy::stats::AppStatsRegistry; fn run( shutdown_receiver: flume::Receiver<()>, @@ -70,9 +71,18 @@ fn run( debug!(?num_workers); rt.block_on(async { - let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?; + let app_stats_registry = AppStatsRegistry::new(); - let frontend_handle = tokio::spawn(frontend::serve(cli_config.port, app)); + let app_stats = app_stats_registry.stats.clone(); + + let app_frontend_port = cli_config.port; + let app_prometheus_port = cli_config.prometheus_port; + + let (app, app_handle) = Web3ProxyApp::spawn(app_stats, top_config, num_workers).await?; + + let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app)); + + let prometheus_handle = tokio::spawn(app_stats_registry.serve(app_prometheus_port)); // if everything is working, these should both run forever // TODO: try_join these instead? use signal_shutdown here? @@ -93,6 +103,14 @@ fn run( } } } + x = flatten_handle(prometheus_handle) => { + match x { + Ok(_) => info!("prometheus exited"), + Err(e) => { + return Err(e); + } + } + } _ = shutdown_receiver.recv_async() => { // TODO: think more about this. we need some way for tests to tell the app to stop info!("received shutdown signal"); @@ -187,6 +205,7 @@ mod tests { // make a test CliConfig let cli_config = CliConfig { port: 0, + prometheus_port: 0, workers: 4, config: "./does/not/exist/test.toml".to_string(), }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index b48ce308..b4602d9d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -14,17 +14,21 @@ pub type BlockAndRpc = (Arc>, Arc); #[derive(Debug, FromArgs)] /// Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. pub struct CliConfig { + /// path to a toml of rpc servers + #[argh(option, default = "\"./config/development.toml\".to_string()")] + pub config: String, + /// what port the proxy should listen on #[argh(option, default = "8544")] pub port: u16, + /// what port the proxy should expose prometheus stats on + #[argh(option, default = "8543")] + pub prometheus_port: u16, + /// number of worker threads. Defaults to the number of logical processors #[argh(option, default = "0")] pub workers: usize, - - /// path to a toml of rpc servers - #[argh(option, default = "\"./config/development.toml\".to_string()")] - pub config: String, } #[derive(Debug, Deserialize)] diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 6111427d..7c57df03 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -615,9 +615,6 @@ impl Web3Connections { let new_block_hash = if let Some(hash) = new_block.hash { hash } else { - // TODO: i think this should just be debug, but maybe it is a warning - warn!(%rpc, ?new_block, "Block without hash!"); - connection_heads.remove(&rpc.url); continue; diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0a3b5a32..37911338 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -34,7 +34,6 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .fallback(errors::handler_404.into_service()); // run our app with hyper - // `axum::Server` is a re-export of `hyper::Server` // TODO: allow only listening on localhost? let addr = SocketAddr::from(([0, 0, 0, 0], port)); info!("listening on port {}", port); @@ -52,10 +51,11 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() let service = app.into_make_service_with_connect_info::(); // let service = app.into_make_service(); + // `axum::Server` is a re-export of `hyper::Server` axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(service) - .with_graceful_shutdown(async { signal_shutdown().await }) + .with_graceful_shutdown(signal_shutdown()) .await .map_err(Into::into) } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 8ffab2e1..5e3b2328 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -16,9 +16,9 @@ use super::errors::anyhow_error_into_response; pub enum RateLimitResult { AllowedIp(IpAddr), - AllowedUser(i64), + AllowedUser(u64), IpRateLimitExceeded(IpAddr), - UserRateLimitExceeded(i64), + UserRateLimitExceeded(u64), UnknownKey, } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 37812ad3..001567f7 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -6,4 +6,5 @@ pub mod connection; pub mod connections; pub mod frontend; pub mod jsonrpc; +pub mod stats; pub mod users;