query params and grouping for user stats

This commit is contained in:
Bryan Stitt 2022-10-19 18:38:00 +00:00
parent 3793bdff67
commit e9abcf883f
6 changed files with 115 additions and 64 deletions

@ -202,6 +202,9 @@ These are roughly in order of completition
- allow setting things such as private relay, revert logging, ip/origin/etc checks
- [ ] GET logged reverts on an endpoint that requires authentication.
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop
- [ ] include if archive query or not in the stats
- this is already partially done, but we need to double check it works. preferrably with tests
- [ ] WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })
@ -347,8 +350,6 @@ in another repo: event subscriber
- [ ] have a low-latency option that always tries at least two servers in parallel and then returns the first success?
- this doubles our request load though. maybe only if the first one doesn't respond very quickly?
- [ ] zero downtime deploys
- [ ] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes?
- [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master)
- [ ] subscribe to pending transactions and build an intelligent gas estimator

@ -1,5 +1,6 @@
// TODO: this file is way too big now. move things into other modules
use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
@ -11,7 +12,6 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use anyhow::Context;
use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent};

@ -3,6 +3,7 @@
use super::authorization::{login_is_authorized, UserKey};
use super::errors::FrontendResult;
use crate::app::Web3ProxyApp;
use crate::user_stats::get_aggregate_stats;
use anyhow::Context;
use axum::{
extract::{Path, Query},
@ -12,20 +13,19 @@ use axum::{
};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use entities::{rpc_accounting, user, user_keys};
use chrono::NaiveDateTime;
use entities::{user, user_keys};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect,
TransactionTrait,
};
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use serde::{Deserialize, Serialize};
use siwe::{Message, VerificationOpts};
use std::ops::Add;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tracing::debug;
use ulid::Ulid;
/// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow.
@ -46,7 +46,8 @@ pub async fn user_login_get(
ClientIp(ip): ClientIp,
// TODO: what does axum's error handling look like if the path fails to parse?
// TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>,
Path((user_address, message_eip)): Path<(Address, Option<String>)>,
Query(query): Query<HashMap<String, String>>,
) -> FrontendResult {
// give these named variables so that we drop them at the very end of this function
let (_, _semaphore) = login_is_authorized(&app, ip).await?;
@ -68,17 +69,9 @@ pub async fn user_login_get(
let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0));
let user_address: Address = params
.remove("user_address")
// TODO: map_err so this becomes a 500. routing must be bad
.context("impossible")?
.parse()
// TODO: map_err so this becomes a 401
.context("bad input")?;
// TODO: get most of these from the app config
let message = Message {
// TODO: should domain be llamanodes, or llamarpc?
// TODO: should domain be llamanodes, or llamarpc, or the subdomain of llamarpc?
domain: "staging.llamanodes.com".parse().unwrap(),
address: user_address.to_fixed_bytes(),
statement: Some("🦙🦙🦙🦙🦙".to_string()),
@ -99,6 +92,7 @@ pub async fn user_login_get(
// the address isn't enough. we need to save the actual message so we can read the nonce
// TODO: what message format is the most efficient to store in redis? probably eip191_bytes
// we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time
// TODO: store a maximum number of attempted logins? anyone can request so we don't want to allow DOS attacks
app.redis_conn()
.await?
.set_ex(session_key, message.to_string(), expire_seconds + 1)
@ -106,9 +100,7 @@ pub async fn user_login_get(
// there are multiple ways to sign messages and not all wallets support them
// TODO: default message eip from config?
let message_eip = params
.remove("message_eip")
.unwrap_or_else(|| "eip4361".to_string());
let message_eip = message_eip.unwrap_or_else(|| "eip4361".to_string());
let message: String = match message_eip.as_str() {
"eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(),
@ -433,53 +425,45 @@ pub async fn user_stats_get(
#[debug_handler]
pub async fn user_stats_aggregate_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Path(mut params): Path<HashMap<String, String>>,
Query(params): Query<HashMap<String, String>>,
) -> FrontendResult {
// TODO: how is this supposed to be used?
let db = app.db_conn.clone().context("no db")?;
// TODO: read this from params
let timeframe = chrono::Utc::now() - chrono::Duration::days(30);
let chain_id = params
.get("chain_id")
.map_or_else::<anyhow::Result<u64>, _, _>(
|| Ok(app.config.chain_id),
|c| {
let c = c.parse()?;
// TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users?
// TODO: how do we count uptime?
let x = rpc_accounting::Entity::find()
.select_only()
.column_as(
rpc_accounting::Column::FrontendRequests.sum(),
"total_requests",
)
.column_as(
rpc_accounting::Column::CacheMisses.sum(),
"total_cache_misses",
)
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
.column_as(
rpc_accounting::Column::BackendRetries.sum(),
"total_backend_retries",
)
.column_as(
rpc_accounting::Column::SumResponseBytes.sum(),
"total_response_bytes",
)
.column_as(
// TODO: can we sum bools like this?
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
)
.filter(
Condition::all()
.add(rpc_accounting::Column::ChainId.eq(app.config.chain_id))
.add(rpc_accounting::Column::PeriodDatetime.gte(timeframe)),
)
.into_json()
.one(&db)
.await?
.unwrap();
Ok(c)
},
)?;
let query_start = params
.get("start_timestamp")
.map_or_else::<anyhow::Result<NaiveDateTime>, _, _>(
|| {
// no timestamp in params. set default
let x = chrono::Utc::now() - chrono::Duration::days(30);
Ok(x.naive_utc())
},
|x: &String| {
// parse the given timestamp
let x = x.parse::<i64>().context("parsing timestamp query param")?;
// TODO: error code 401
let x = NaiveDateTime::from_timestamp_opt(x, 0)
.context("parsing timestamp query param")?;
Ok(x)
},
)?;
// TODO: optionally no chain id?
let x = get_aggregate_stats(chain_id, &db, query_start).await?;
Ok(Json(x).into_response())
}

@ -1,4 +1,5 @@
pub mod app;
pub mod app_stats;
pub mod block_number;
pub mod config;
pub mod frontend;
@ -6,4 +7,4 @@ pub mod jsonrpc;
pub mod metered;
pub mod metrics_frontend;
pub mod rpcs;
pub mod stats;
pub mod user_stats;

@ -0,0 +1,65 @@
use entities::rpc_accounting;
use num::Zero;
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect};
pub async fn get_aggregate_stats(
chain_id: u64,
db: &DatabaseConnection,
query_start: chrono::NaiveDateTime,
) -> anyhow::Result<Vec<serde_json::Value>> {
// TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users?
// TODO: how do we count uptime?
let q = rpc_accounting::Entity::find()
.select_only()
.column_as(
rpc_accounting::Column::FrontendRequests.sum(),
"total_requests",
)
.column_as(
rpc_accounting::Column::CacheMisses.sum(),
"total_cache_misses",
)
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
.column_as(
rpc_accounting::Column::BackendRetries.sum(),
"total_backend_retries",
)
.column_as(
rpc_accounting::Column::SumResponseBytes.sum(),
"total_response_bytes",
)
.column_as(
// TODO: can we sum bools like this?
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
);
let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start));
let (q, condition) = if chain_id.is_zero() {
// fetch all the chains. don't filter
let q = q
.column(rpc_accounting::Column::ChainId)
.group_by(rpc_accounting::Column::ChainId);
(q, condition)
} else {
let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id));
(q, condition)
};
let q = q.filter(condition);
// TODO: if user key id is set, use that
// TODO: if user id is set, use that
// TODO: handle secondary users, too
let r = q.into_json().all(db).await?;
Ok(r)
}