aggregate stats endpoint

This commit is contained in:
Bryan Stitt 2022-10-19 00:56:57 +00:00
parent 0cd899fd8e
commit 3793bdff67
5 changed files with 80 additions and 11 deletions

@ -197,6 +197,7 @@ These are roughly in order of completition
- [ ] display requests per second per api key (only with authentication!)
- [ ] display concurrent requests per api key (only with authentication!)
- [ ] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!)
- [x] get aggregate stats endpoint
- [ ] POST key endpoint
- allow setting things such as private relay, revert logging, ip/origin/etc checks
- [ ] GET logged reverts on an endpoint that requires authentication.
@ -445,3 +446,4 @@ in another repo: event subscriber
- [ ] document "backend requests is cache_misses + backend_retries"
- [ ] having tons of worker threads can actually make us slower if they keep waking to steal work from eachother. need benchmarks
- [ ] change the wrk data to log requests and errors to a file
- [ ] if redis is not set and login page is visited, users get a 502. should be 501

@ -764,8 +764,9 @@ impl Web3ProxyApp {
Ok(collected)
}
pub fn db_conn(&self) -> Option<&DatabaseConnection> {
self.db_conn.as_ref()
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
}
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {

@ -291,7 +291,7 @@ pub async fn bearer_is_authorized(
// turn user key id into a user key
let user_key_data = user_keys::Entity::find_by_id(user_key_id)
.one(db_conn)
.one(&db_conn)
.await
.context("fetching user key by id")?
.context("unknown user id")?;
@ -486,7 +486,7 @@ impl Web3ProxyApp {
match user_keys::Entity::find()
.filter(user_keys::Column::ApiKey.eq(user_uuid))
.filter(user_keys::Column::Active.eq(true))
.one(db)
.one(&db)
.await?
{
Some(user_key_model) => {

@ -74,6 +74,10 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
.route("/user/profile", get(users::user_profile_get))
.route("/user/profile", post(users::user_profile_post))
.route("/user/stats", get(users::user_stats_get))
.route(
"/user/stats/aggregate",
get(users::user_stats_aggregate_get),
)
.route("/user/logout", post(users::user_logout_post))
.route("/status", get(status::status))
// TODO: make this optional or remove it since it is available on another port

@ -12,12 +12,15 @@ use axum::{
};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use entities::{user, user_keys};
use entities::{rpc_accounting, user, user_keys};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use siwe::{Message, VerificationOpts};
use std::ops::Add;
@ -67,8 +70,10 @@ pub async fn user_login_get(
let user_address: Address = params
.remove("user_address")
// TODO: map_err so this becomes a 500. routing must be bad
.context("impossible")?
.parse()
// TODO: map_err so this becomes a 401
.context("bad input")?;
// TODO: get most of these from the app config
@ -90,7 +95,7 @@ pub async fn user_login_get(
let session_key = format!("pending:{}", nonce);
// TODO: if no redis server, store in local cache?
// TODO: if no redis server, store in local cache? at least give a better error. right now this seems to be a 502
// the address isn't enough. we need to save the actual message so we can read the nonce
// TODO: what message format is the most efficient to store in redis? probably eip191_bytes
// we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time
@ -110,7 +115,7 @@ pub async fn user_login_get(
"eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(),
"eip4361" => message.to_string(),
_ => {
// TODO: this needs the correct error code in the response
// TODO: custom error that is handled a 401
return Err(anyhow::anyhow!("invalid message eip given").into());
}
};
@ -129,6 +134,7 @@ pub struct PostLoginQuery {
/// JSON body to our `post_login` handler.
/// Currently only siwe logins that send an address, msg, and sig are allowed.
/// Email/password and other login methods are planned.
#[derive(Deserialize)]
pub struct PostLogin {
pub address: Address,
@ -146,6 +152,7 @@ pub struct PostLoginResponse {
bearer_token: Ulid,
/// Used for authenticating with the RPC endpoints.
api_keys: Vec<UserKey>,
// TODO: what else?
}
/// `POST /user/login` - Register or login by posting a signed "siwe" message.
@ -199,7 +206,7 @@ pub async fn user_login_post(
// TODO: limit columns or load whole user?
let u = user::Entity::find()
.filter(user::Column::Address.eq(our_msg.address.as_ref()))
.one(db)
.one(&db)
.await
.unwrap();
@ -248,7 +255,7 @@ pub async fn user_login_post(
// the user is already registered
let uks = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(u.id))
.all(db)
.all(&db)
.await
.context("failed loading user's key")?;
@ -337,7 +344,7 @@ pub async fn user_profile_post(
let db = app.db_conn().context("Getting database connection")?;
user.save(db).await?;
user.save(&db).await?;
todo!("finish post_user");
}
@ -422,6 +429,61 @@ pub async fn user_stats_get(
todo!("user_stats_get");
}
/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested.
#[debug_handler]
pub async fn user_stats_aggregate_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
// TODO: how is this supposed to be used?
let db = app.db_conn.clone().context("no db")?;
// TODO: read this from params
let timeframe = chrono::Utc::now() - chrono::Duration::days(30);
// TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users?
// TODO: how do we count uptime?
let x = rpc_accounting::Entity::find()
.select_only()
.column_as(
rpc_accounting::Column::FrontendRequests.sum(),
"total_requests",
)
.column_as(
rpc_accounting::Column::CacheMisses.sum(),
"total_cache_misses",
)
.column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits")
.column_as(
rpc_accounting::Column::BackendRetries.sum(),
"total_backend_retries",
)
.column_as(
rpc_accounting::Column::SumResponseBytes.sum(),
"total_response_bytes",
)
.column_as(
// TODO: can we sum bools like this?
rpc_accounting::Column::ErrorResponse.sum(),
"total_error_responses",
)
.column_as(
rpc_accounting::Column::SumResponseMillis.sum(),
"total_response_millis",
)
.filter(
Condition::all()
.add(rpc_accounting::Column::ChainId.eq(app.config.chain_id))
.add(rpc_accounting::Column::PeriodDatetime.gte(timeframe)),
)
.into_json()
.one(&db)
.await?
.unwrap();
Ok(Json(x).into_response())
}
/// `GET /user/profile` -- Use a bearer token to get the user's profile such as their optional email address.
/// Handle authorization for a given address and bearer token.
// TODO: what roles should exist?