add stat emitter

This commit is contained in:
Bryan Stitt 2022-10-03 18:08:01 +00:00
parent a7b5f25467
commit 25d34da98d
10 changed files with 157 additions and 30 deletions

46
Cargo.lock generated

@ -2479,6 +2479,35 @@ dependencies = [
"regex",
]
[[package]]
name = "influxdb"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39023407f0546c3b30607950f8b600c7db4ef7621fbaa0159de733d73e68b23f"
dependencies = [
"chrono",
"futures-util",
"http",
"influxdb_derive",
"lazy_static",
"regex",
"reqwest",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "influxdb_derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d81efbf97cec06c647f05a8b5edcbc52434cdf980d8d4ace68e1028c90241d3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "inotify"
version = "0.9.6"
@ -4053,9 +4082,9 @@ dependencies = [
[[package]]
name = "sea-orm"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "089dcca5d2c8393f5f21c7f7b0d84062839c3264ba62dcc0043eb207862e13a1"
checksum = "84c7282fc3d7f79f6c5bd57e603319862fc778bf74118c0ce2a0dc82d9141dee"
dependencies = [
"async-stream",
"async-trait",
@ -4080,9 +4109,9 @@ dependencies = [
[[package]]
name = "sea-orm-cli"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cefd2d8878bd7e8b7313f036725fa3d08585d101fb1bf3adca7fc13f553f906"
checksum = "de07b4a0fc83b1b7ef8a2fe5d42c2662b1c60315446e6ebb4151a301c35fe484"
dependencies = [
"async-std",
"chrono",
@ -4097,9 +4126,9 @@ dependencies = [
[[package]]
name = "sea-orm-macros"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d9708f945febe8625ccb0334654b97d1a1a0bffa0473d86e6108ad13f79bd5"
checksum = "f96b8a479d25d8110751a0511265556dd9139bc11e342357a98e60910fbb07e3"
dependencies = [
"bae",
"heck 0.3.3",
@ -4110,9 +4139,9 @@ dependencies = [
[[package]]
name = "sea-orm-migration"
version = "0.9.2"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70a28587780fbae5c414a62bf0b32405f9da2e000d94f426abf214b2b2e68631"
checksum = "f80e3ebbc654c1915686898de119d33a449a9512567009df0a3e95b1afe4e36c"
dependencies = [
"async-trait",
"clap",
@ -5525,6 +5554,7 @@ dependencies = [
"handlebars",
"hashbrown",
"http",
"influxdb",
"ipnet",
"metered",
"migration",

@ -174,8 +174,8 @@ These are roughly in order of completition
- [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet.
- [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high?
- [x] did this by checking a key/ip-specific semaphore before checking rate limits
- [ ] collect active requests per second per api key
- [ ] collect parallel requests per api key
- [ ] collect requests per second per api key
- [ ] collect concurrent requests per api key
- [ ] collect distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [ ] display key stats on an endpoint that requires authentication
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -10,6 +10,6 @@ path = "src/mod.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sea-orm = "0.9.2"
sea-orm = "0.9.3"
serde = "1.0.145"
uuid = "1.1.2"

@ -12,7 +12,7 @@ path = "src/lib.rs"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration]
version = "0.9.2"
version = "0.9.3"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.

@ -36,6 +36,7 @@ flume = "0.10.14"
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
influxdb = { version = "0.5.2", features = ["derive"] }
ipnet = "2.5.0"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.4", default-features = false, features = ["future"] }
@ -53,7 +54,7 @@ reqwest = { version = "0.11.12", default-features = false, features = ["json", "
handlebars = "4.3.4"
rustc-hash = "1.1.0"
siwe = "0.4.2"
sea-orm = { version = "0.9.2", features = ["macros"] }
sea-orm = { version = "0.9.3", features = ["macros"] }
serde = { version = "1.0.145", features = [] }
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"

@ -11,6 +11,7 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::stats::StatEmitter;
use anyhow::Context;
use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent};
@ -168,6 +169,7 @@ pub async fn get_migrated_db(
#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)]
impl Web3ProxyApp {
/// The main entrypoint.
pub async fn spawn(
top_config: TopConfig,
num_workers: usize,
@ -181,10 +183,11 @@ impl Web3ProxyApp {
"redirect user url must contain \"{{user_id}}\""
);
// setup metrics
let app_metrics = Default::default();
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
// first, we connect to mysql and make sure the latest migrations have run
// connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = top_config.app.db_url.clone() {
let db_min_connections = top_config
.app
@ -192,12 +195,12 @@ impl Web3ProxyApp {
.unwrap_or(num_workers as u32);
// TODO: what default multiple?
let redis_max_connections = top_config
let db_max_connections = top_config
.app
.db_max_connections
.unwrap_or(db_min_connections * 2);
let db = get_migrated_db(db_url, db_min_connections, redis_max_connections).await?;
let db = get_migrated_db(db_url, db_min_connections, db_max_connections).await?;
Some(db)
} else {
@ -206,19 +209,14 @@ impl Web3ProxyApp {
};
let balanced_rpcs = top_config.balanced_rpcs;
let private_rpcs = if let Some(private_rpcs) = top_config.private_rpcs {
private_rpcs
} else {
Default::default()
};
let private_rpcs = top_config.private_rpcs.unwrap_or_default();
// TODO: try_join_all instead?
let handles = FuturesUnordered::new();
// make a http shared client
// TODO: can we configure the connection pool? should we?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
// TODO: timeouts from config. defaults are hopefully good
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
@ -227,6 +225,8 @@ impl Web3ProxyApp {
.build()?,
);
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
let redis_pool = match top_config.app.redis_url.as_ref() {
Some(redis_url) => {
// TODO: scrub credentials and then include the redis_url in logs
@ -261,13 +261,36 @@ impl Web3ProxyApp {
}
};
// setup a channel here for receiving influxdb stats
// we do this in a channel so we don't slow down our response to the users
// TODO: make influxdb optional
let stat_sender = if let Some(influxdb_url) = top_config.app.influxdb_url.clone() {
let influxdb_name = top_config
.app
.influxdb_name
.clone()
.context("connecting to influxdb")?;
// TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats
let (stat_sender, stat_handle) =
StatEmitter::spawn(influxdb_url, influxdb_name, http_client.clone());
handles.push(stat_handle);
Some(stat_sender)
} else {
warn!("no influxdb connection");
None
};
// TODO: i don't like doing Block::default here! Change this to "None"?
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
drop(pending_tx_receiver);
@ -287,6 +310,7 @@ impl Web3ProxyApp {
.weigher(|_k, v| size_of_val(v) as u32)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
top_config.app.chain_id,
balanced_rpcs,
@ -301,16 +325,18 @@ impl Web3ProxyApp {
open_request_handle_metrics.clone(),
)
.await
.context("balanced rpcs")?;
.context("spawning balanced rpcs")?;
// save the handle to catch any errors
handles.push(balanced_handle);
// connect to the private rpcs
// only some chains have this, so this is optional
let private_rpcs = if private_rpcs.is_empty() {
// TODO: do None instead of clone?
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
} else {
// TODO: attach context to this error
let (private_rpcs, private_handle) = Web3Connections::spawn(
top_config.app.chain_id,
private_rpcs,
@ -328,15 +354,16 @@ impl Web3ProxyApp {
open_request_handle_metrics.clone(),
)
.await
.context("private_rpcs")?;
.context("spawning private_rpcs")?;
// save the handle to catch any errors
handles.push(private_handle);
Some(private_rpcs)
};
// TODO: setup a channel here for receiving influxdb stats
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_key_rate_limiter = None;
let mut login_rate_limiter = None;
@ -382,6 +409,7 @@ impl Web3ProxyApp {
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// all the users are the same size, so no need for a weigher
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
let user_key_cache = Cache::builder()
@ -389,6 +417,7 @@ impl Web3ProxyApp {
.time_to_live(Duration::from_secs(60))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
let user_key_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))

@ -78,7 +78,7 @@ fn run(
let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port));
// if everything is working, these should both run forever
// TODO: try_join these instead? use signal_shutdown here?
// TODO: try_join these instead?
tokio::select! {
x = app_handle => {
match x {

@ -45,6 +45,7 @@ pub struct TopConfig {
}
/// shared configuration between Web3Connections
// TODO: no String, only &str
#[derive(Debug, Default, Deserialize)]
pub struct AppConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
@ -59,6 +60,7 @@ pub struct AppConfig {
/// If none, the minimum * 2 is used
pub db_max_connections: Option<u32>,
pub influxdb_url: Option<String>,
pub influxdb_name: Option<String>,
pub default_requests_per_minute: Option<u64>,
pub invite_code: Option<String>,
#[serde(default = "default_min_sum_soft_limit")]

@ -6,3 +6,4 @@ pub mod jsonrpc;
pub mod metered;
pub mod metrics_frontend;
pub mod rpcs;
pub mod stats;

64
web3_proxy/src/stats.rs Normal file

@ -0,0 +1,64 @@
use chrono::{DateTime, Utc};
use influxdb::InfluxDbWriteable;
use influxdb::{Client, Query, ReadQuery, Timestamp};
use tokio::task::JoinHandle;
use tracing::{error, info};
/// TODO: replace this example stat with our own
#[derive(InfluxDbWriteable)]
pub struct WeatherReading {
time: DateTime<Utc>,
humidity: i32,
#[influxdb(tag)]
wind_direction: String,
}
pub enum Web3ProxyStat {
WeatherReading(WeatherReading),
}
impl Web3ProxyStat {
fn into_query(self) -> influxdb::WriteQuery {
match self {
Self::WeatherReading(x) => x.into_query("weather"),
}
}
}
pub struct StatEmitter;
impl StatEmitter {
pub fn spawn(
influxdb_url: String,
influxdb_name: String,
http_client: Option<reqwest::Client>,
) -> (flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>) {
let (tx, rx) = flume::unbounded::<Web3ProxyStat>();
let client = Client::new(influxdb_url, influxdb_name);
// use an existing http client
let client = if let Some(http_client) = http_client {
client.with_http_client(http_client)
} else {
client
};
let f = async move {
while let Ok(x) = rx.recv_async().await {
if let Err(err) = client.query(x.into_query()).await {
error!(?err, "failed writing stat");
// TODO: now what?
}
}
info!("stat emitter exited");
Ok(())
};
let handle = tokio::spawn(f);
(tx, handle)
}
}