//! Handle registration, logins, and managing account data. use super::authorization::{login_is_authorized, RpcSecretKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; use crate::frontend::errors::FrontendErrorResponse; use crate::user_queries::{ get_chain_id_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, }; use crate::user_queries::{get_detailed_rpc_stats_for_params, get_page_from_params}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::{ extract::{Path, Query}, headers::{authorization::Bearer, Authorization}, response::IntoResponse, Extension, Json, TypedHeader, }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; use entities::{revert_log, rpc_accounting, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, TransactionTrait, }; use serde::Deserialize; use serde_json::json; use siwe::{Message, VerificationOpts}; use std::ops::Add; use std::str::FromStr; use std::sync::Arc; use time::{Duration, OffsetDateTime}; use tracing::{instrument, warn}; use ulid::Ulid; /// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow. /// /// `message_eip`s accepted: /// - eip191_bytes /// - eip191_hash /// - eip4361 (default) /// /// Coming soon: eip1271 /// /// This is the initial entrypoint for logging in. Take the response from this endpoint and give it to your user's wallet for singing. POST the response to `/user/login`. /// /// Rate limited by IP address. /// /// At first i thought about checking that user_address is in our db, /// But theres no need to separate the registration and login flows. /// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist. /// We can prompt for an email and and payment after they log in. #[debug_handler] #[instrument(level = "trace")] pub async fn user_login_get( Extension(app): Extension>, ClientIp(ip): ClientIp, // TODO: what does axum's error handling look like if the path fails to parse? Path(mut params): Path>, ) -> FrontendResult { login_is_authorized(&app, ip).await?; // create a message and save it in redis // TODO: how many seconds? get from config? let expire_seconds: usize = 20 * 60; let nonce = Ulid::new(); let issued_at = OffsetDateTime::now_utc(); let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0)); // TODO: allow ENS names here? let user_address: Address = params .remove("user_address") // TODO: map_err so this becomes a 500. routing must be bad .context("impossible")? .parse() // TODO: map_err so this becomes a 401 .context("bad input")?; // TODO: get most of these from the app config let message = Message { // TODO: should domain be llamanodes, or llamarpc, or the subdomain of llamarpc? domain: "staging.llamanodes.com".parse().unwrap(), address: user_address.to_fixed_bytes(), statement: Some("🦙🦙🦙🦙🦙".to_string()), uri: "https://staging.llamanodes.com/".parse().unwrap(), version: siwe::Version::V1, chain_id: 1, expiration_time: Some(expiration_time.into()), issued_at: issued_at.into(), nonce: nonce.to_string(), not_before: None, request_id: None, resources: vec![], }; // TODO: if no redis server, store in local cache? at least give a better error. right now this seems to be a 502 // the address isn't enough. we need to save the actual message so we can read the nonce // TODO: what message format is the most efficient to store in redis? probably eip191_bytes // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time // TODO: store a maximum number of attempted logins? anyone can request so we don't want to allow DOS attacks let session_key = format!("login_nonce:{}", nonce); app.redis_conn() .await? .set_ex(session_key, message.to_string(), expire_seconds + 1) .await?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? let message_eip = params .remove("message_eip") .unwrap_or_else(|| "eip4361".to_string()); let message: String = match message_eip.as_str() { "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(), "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip4361" => message.to_string(), _ => { // TODO: custom error that is handled a 401 return Err(anyhow::anyhow!("invalid message eip given").into()); } }; Ok(message.into_response()) } /// Query params for our `post_login` handler. #[derive(Debug, Deserialize)] pub struct PostLoginQuery { /// While we are in alpha/beta, we require users to supply an invite code. /// The invite code (if any) is set in the application's config. /// This may eventually provide some sort of referral bonus. pub invite_code: Option, } /// JSON body to our `post_login` handler. /// Currently only siwe logins that send an address, msg, and sig are allowed. /// Email/password and other login methods are planned. #[derive(Debug, Deserialize)] pub struct PostLogin { sig: String, msg: String, } /// `POST /user/login` - Register or login by posting a signed "siwe" message. /// It is recommended to save the returned bearer token in a cookie. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. #[debug_handler] #[instrument(level = "trace")] pub async fn user_login_post( Extension(app): Extension>, ClientIp(ip): ClientIp, Json(payload): Json, Query(query): Query, ) -> FrontendResult { login_is_authorized(&app, ip).await?; // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] let their_sig_bytes = Bytes::from_str(&payload.sig).context("parsing sig")?; if their_sig_bytes.len() != 65 { return Err(anyhow::anyhow!("checking signature length").into()); } let mut their_sig: [u8; 65] = [0; 65]; for x in 0..65 { their_sig[x] = their_sig_bytes[x] } // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { let their_msg_bytes = Bytes::from_str(&payload.msg).context("parsing payload message")?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() .context("parsing hex string message")? } else { payload .msg .parse::() .context("parsing string message")? }; // the only part of the message we will trust is their nonce // TODO: this is fragile. have a helper function/struct for redis keys let login_nonce_key = format!("login_nonce:{}", &their_msg.nonce); // fetch the message we gave them from our redis let mut redis_conn = app.redis_conn().await?; let our_msg: Option = redis_conn.get(&login_nonce_key).await?; let our_msg: String = our_msg.context("login nonce not found")?; let our_msg: siwe::Message = our_msg.parse().context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); // Check with both verify and verify_eip191 if let Err(err_1) = our_msg .verify(&their_sig, &verify_config) .await .context("verifying signature against our local message") { // verification method 1 failed. try eip191 if let Err(err_191) = our_msg .verify_eip191(&their_sig) .context("verifying eip191 signature against our local message") { return Err(anyhow::anyhow!( "both the primary and eip191 verification failed: {:#?}; {:#?}", err_1, err_191 ) .into()); } } let db_conn = app.db_conn().context("Getting database connection")?; // TODO: limit columns or load whole user? let u = user::Entity::find() .filter(user::Column::Address.eq(our_msg.address.as_ref())) .one(&db_conn) .await .unwrap(); let (u, uks, status_code) = match u { None => { // user does not exist yet // check the invite code // TODO: more advanced invite codes that set different request/minute and concurrency limits if let Some(invite_code) = &app.config.invite_code { if query.invite_code.as_ref() != Some(invite_code) { return Err(anyhow::anyhow!("checking invite_code").into()); } } let txn = db_conn.begin().await?; // the only thing we need from them is an address // everything else is optional // TODO: different invite codes should allow different levels // TODO: maybe decrement a count on the invite code? let u = user::ActiveModel { address: sea_orm::Set(our_msg.address.into()), ..Default::default() }; let u = u.insert(&txn).await?; // create the user's first api key // TODO: rename to UserApiKey? RpcApiKey? let rpc_secret_key = RpcSecretKey::new(); // TODO: variable requests per minute depending on the invite code let uk = rpc_key::ActiveModel { user_id: sea_orm::Set(u.id), secret_key: sea_orm::Set(rpc_secret_key.into()), description: sea_orm::Set(Some("first".to_string())), ..Default::default() }; let uk = uk .insert(&txn) .await .context("Failed saving new user key")?; let uks = vec![uk]; // save the user and key to the database txn.commit().await?; (u, uks, StatusCode::CREATED) } Some(u) => { // the user is already registered let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(u.id)) .all(&db_conn) .await .context("failed loading user's key")?; (u, uks, StatusCode::OK) } }; // create a bearer token for the user. let bearer_token = Ulid::new(); // json response with everything in it // we could return just the bearer token, but I think they will always request api keys and the user profile let response_json = json!({ "rpc_keys": uks .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), "bearer_token": bearer_token, "user": u, }); let response = (status_code, Json(response_json)).into_response(); // add bearer to redis // TODO: use a helper function/struct for this let bearer_redis_key = UserBearerToken(bearer_token).to_string(); // expire in 4 weeks // TODO: get expiration time from app config redis_conn .set_ex(bearer_redis_key, u.id.to_string(), 2_419_200) .await?; if let Err(err) = redis_conn.del::<_, u64>(&login_nonce_key).await { warn!( "Failed to delete login_nonce_key {}: {}", login_nonce_key, err ); } Ok(response) } /// `POST /user/logout` - Forget the bearer token in the `Authentication` header. #[debug_handler] #[instrument(level = "trace")] pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let mut redis_conn = app.redis_conn().await?; // TODO: i don't like this. move this to a helper function so it is less fragile let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); redis_conn.del(bearer_cache_key).await?; // TODO: what should the response be? probably json something Ok("goodbye".into_response()) } /// `GET /user` -- Use a bearer token to get the user's profile. /// /// - the email address of a user if they opted in to get contacted via email /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] #[instrument(level = "trace")] pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; Ok(Json(user).into_response()) } /// the JSON input to the `post_user` handler. #[derive(Debug, Deserialize)] pub struct UserPost { email: Option, } /// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header. #[debug_handler] #[instrument(level = "trace")] pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; let mut user: user::ActiveModel = user.into(); // update the email address if let Some(x) = payload.email { // TODO: only Set if no change if x.is_empty() { user.email = sea_orm::Set(None); } else { // TODO: do some basic validation // TODO: don't set immediatly, send a confirmation email first // TODO: compare first? or is sea orm smart enough to do that for us? user.email = sea_orm::Set(Some(x)); } } // TODO: what else can we update here? password hash? subscription to newsletter? let user = if user.is_changed() { let db_conn = app.db_conn().context("Getting database connection")?; user.save(&db_conn).await? } else { // no changes. no need to touch the database user }; let user: user::Model = user.try_into().context("Returning updated user")?; Ok(Json(user).into_response()) } /// `GET /user/balance` -- Use a bearer token to get the user's balance and spend. /// /// - show balance in USD /// - show deposits history (currency, amounts, transaction id) /// /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] #[instrument(level = "trace")] pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_get"); } /// `POST /user/balance/:txhash` -- Manually process a confirmed txid to update a user's balance. /// /// We will subscribe to events to watch for any user deposits, but sometimes events can be missed. /// /// TODO: rate limit by user /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] #[instrument(level = "trace")] pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_post"); } /// `GET /user/keys` -- Use a bearer token to get the user's api keys and their settings. /// /// TODO: one key per request? maybe /user/keys/:rpc_key? #[debug_handler] #[instrument(level = "trace")] pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let db_conn = app.db_conn().context("getting db to fetch user's keys")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(&db_conn) .await .context("failed loading user's key")?; // TODO: stricter type on this? let response_json = json!({ "user_id": user.id, "user_rpc_keys": uks .into_iter() .map(|uk| (uk.id, uk)) .collect::>(), }); Ok(Json(response_json).into_response()) } /// `DELETE /user/keys` -- Use a bearer token to delete an existing key. #[debug_handler] #[instrument(level = "trace")] pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work Err(anyhow::anyhow!("work in progress").into()) } /// the JSON input to the `rpc_keys_management` handler. /// If `key_id` is set, it updates an existing key. /// If `key_id` is not set, it creates a new key. #[derive(Debug, Deserialize)] pub struct UserKeyManagement { key_id: Option, description: Option, private_txs: Option, active: Option, // TODO: enable log_revert_trace: Option, allowed_ips: Option, allowed_origins: Option, allowed_referers: Option, allowed_user_agents: Option, // do not allow! `user_tier: Option,` } /// `POST /user/keys` or `PUT /user/keys` -- Use a bearer token to create or update an existing key. #[debug_handler] #[instrument(level = "trace")] pub async fn rpc_keys_management( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, ) -> FrontendResult { // TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let db_conn = app.db_conn().context("getting db for user's keys")?; let mut uk = if let Some(existing_key_id) = payload.key_id { // get the key and make sure it belongs to the user let uk = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .filter(rpc_key::Column::Id.eq(existing_key_id)) .one(&db_conn) .await .context("failed loading user's key")? .context("key does not exist or is not controlled by this bearer token")?; uk.try_into().unwrap() } else { // make a new key // TODO: limit to 10 keys? let secret_key = RpcSecretKey::new(); rpc_key::ActiveModel { user_id: sea_orm::Set(user.id), secret_key: sea_orm::Set(secret_key.into()), ..Default::default() } }; // TODO: do we need null descriptions? default to empty string should be fine, right? if let Some(description) = payload.description { if description.is_empty() { uk.description = sea_orm::Set(None); } else { uk.description = sea_orm::Set(Some(description)); } } if let Some(private_txs) = payload.private_txs { uk.private_txs = sea_orm::Set(private_txs); } if let Some(active) = payload.active { uk.active = sea_orm::Set(active); } if let Some(allowed_ips) = payload.allowed_ips { if allowed_ips.is_empty() { uk.allowed_ips = sea_orm::Set(None); } else { // split allowed ips on ',' and try to parse them all. error on invalid input let allowed_ips = allowed_ips .split(',') .map(|x| x.parse::()) .collect::, _>>()? // parse worked. convert back to Strings .into_iter() .map(|x| x.to_string()); // and join them back together let allowed_ips: String = Itertools::intersperse(allowed_ips, ", ".to_string()).collect(); uk.allowed_ips = sea_orm::Set(Some(allowed_ips)); } } // TODO: this should actually be bytes if let Some(allowed_origins) = payload.allowed_origins { if allowed_origins.is_empty() { uk.allowed_origins = sea_orm::Set(None); } else { // split allowed_origins on ',' and try to parse them all. error on invalid input let allowed_origins = allowed_origins .split(',') .map(HeaderValue::from_str) .collect::, _>>()? .into_iter() .map(|x| Origin::decode(&mut [x].iter())) .collect::, _>>()? // parse worked. convert back to String and join them back together .into_iter() .map(|x| x.to_string()); let allowed_origins: String = Itertools::intersperse(allowed_origins, ", ".to_string()).collect(); uk.allowed_origins = sea_orm::Set(Some(allowed_origins)); } } // TODO: this should actually be bytes if let Some(allowed_referers) = payload.allowed_referers { if allowed_referers.is_empty() { uk.allowed_referers = sea_orm::Set(None); } else { // split allowed ips on ',' and try to parse them all. error on invalid input let allowed_referers = allowed_referers .split(',') .map(HeaderValue::from_str) .collect::, _>>()? .into_iter() .map(|x| Referer::decode(&mut [x].iter())) .collect::, _>>()?; // parse worked. now we can put it back together. // but we can't go directly to String. // so we convert to HeaderValues first let mut header_map = vec![]; for x in allowed_referers { x.encode(&mut header_map); } // convert HeaderValues to Strings // since we got these from strings, this should always work (unless we figure out using bytes) let allowed_referers = header_map .into_iter() .map(|x| x.to_str().map(|x| x.to_string())) .collect::, _>>()?; // join strings together with commas let allowed_referers: String = Itertools::intersperse(allowed_referers.into_iter(), ", ".to_string()).collect(); uk.allowed_referers = sea_orm::Set(Some(allowed_referers)); } } if let Some(allowed_user_agents) = payload.allowed_user_agents { if allowed_user_agents.is_empty() { uk.allowed_user_agents = sea_orm::Set(None); } else { // split allowed_user_agents on ',' and try to parse them all. error on invalid input let allowed_user_agents = allowed_user_agents .split(',') .filter_map(|x| x.parse::().ok()) // parse worked. convert back to String .map(|x| x.to_string()); // join the strings together let allowed_user_agents: String = Itertools::intersperse(allowed_user_agents, ", ".to_string()).collect(); uk.allowed_user_agents = sea_orm::Set(Some(allowed_user_agents)); } } let uk = if uk.is_changed() { uk.save(&db_conn).await.context("Failed saving user key")? } else { uk }; let uk: rpc_key::Model = uk.try_into()?; Ok(Json(uk).into_response()) } /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. #[debug_handler] #[instrument(level = "trace")] pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Query(params): Query>, ) -> FrontendResult { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; let query_start = get_query_start_from_params(¶ms)?; let page = get_page_from_params(¶ms)?; // TODO: page size from config let page_size = 200; let mut response = HashMap::new(); response.insert("page", json!(page)); response.insert("page_size", json!(page_size)); response.insert("chain_id", json!(chain_id)); response.insert("query_start", json!(query_start.timestamp() as u64)); let db_conn = app.db_conn().context("getting db for user's revert logs")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(&db_conn) .await .context("failed loading user's key")?; // TODO: only select the ids let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); // get paginated logs let q = revert_log::Entity::find() .filter(revert_log::Column::Timestamp.gte(query_start)) .filter(revert_log::Column::RpcKeyId.is_in(uks)) .order_by_asc(revert_log::Column::Timestamp); let q = if chain_id == 0 { // don't do anything q } else { // filter on chain id q.filter(revert_log::Column::ChainId.eq(chain_id)) }; let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; response.insert("revert_logs", json!(revert_logs)); Ok(Json(response).into_response()) } /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. /// /// If no bearer is provided, detailed stats for all users will be shown. /// View a single user with `?user_id=$x`. /// View a single chain with `?chain_id=$x`. /// /// Set `$x` to zero to see all. /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] #[instrument(level = "trace")] pub async fn user_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, ) -> FrontendResult { let x = get_detailed_rpc_stats_for_params(&app, bearer, params).await?; Ok(Json(x).into_response()) } /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. #[debug_handler] #[instrument(level = "trace")] pub async fn user_stats_aggregate_get( bearer: Option>>, Extension(app): Extension>, Query(params): Query>, ) -> FrontendResult { let db_conn = app.db_conn().context("connecting to db")?; let redis_conn = app.redis_conn().await.context("connecting to redis")?; let mut response = HashMap::new(); let q = rpc_accounting::Entity::find() .select_only() .column_as( rpc_accounting::Column::FrontendRequests.sum(), "total_requests", ) .column_as( rpc_accounting::Column::BackendRequests.sum(), "total_backend_retries", ) .column_as( rpc_accounting::Column::CacheMisses.sum(), "total_cache_misses", ) .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") .column_as( rpc_accounting::Column::SumResponseBytes.sum(), "total_response_bytes", ) .column_as( // TODO: can we sum bools like this? rpc_accounting::Column::ErrorResponse.sum(), "total_error_responses", ) .column_as( rpc_accounting::Column::SumResponseMillis.sum(), "total_response_millis", ); let condition = Condition::all(); // TODO: DRYer! move this onto query_window_seconds_from_params? let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; let q = if query_window_seconds == 0 { // TODO: order by more than this? // query_window_seconds is not set so we aggregate all records // TODO: i am pretty sure we need to filter by something q } else { // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? // TODO: how can we get the first window to start at query_start_timestamp let expr = Expr::cust_with_values( "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", [query_window_seconds, query_window_seconds], ); response.insert( "query_window_seconds", serde_json::Value::Number(query_window_seconds.into()), ); q.column_as(expr, "query_window") .group_by(Expr::cust("query_window")) // TODO: is there a simpler way to order_by? .order_by_asc(SimpleExpr::Custom("query_window".to_string())) }; // aggregate stats after query_start // TODO: minimum query_start of 90 days? let query_start = get_query_start_from_params(¶ms)?; // TODO: if no query_start, don't add to response or condition response.insert( "query_start", serde_json::Value::Number(query_start.timestamp().into()), ); let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); // filter on chain_id let chain_id = get_chain_id_from_params(&app, ¶ms)?; let (condition, q) = if chain_id == 0 { // fetch all the chains. don't filter or aggregate (condition, q) } else { let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); response.insert("chain_id", serde_json::Value::Number(chain_id.into())); (condition, q) }; // filter on rpc_key_id // TODO: move getting the param and checking the bearer token into a helper function let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id.parse::().map_err(|e| { FrontendErrorResponse::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), e.into(), ) })?; if rpc_key_id == 0 { (condition, q) } else { // TODO: make sure that the bearer token is allowed to view this rpc_key_id let q = q.group_by(rpc_accounting::Column::RpcKeyId); let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); (condition, q) } } else { (condition, q) }; // get_user_id_from_params checks that the bearer is connected to this user_id let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; let (condition, q) = if user_id == 0 { // 0 means everyone. don't filter on user (condition, q) } else { let q = q.left_join(rpc_key::Entity); let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); response.insert("user_id", serde_json::Value::Number(user_id.into())); (condition, q) }; // now that all the conditions are set up. add them to the query let q = q.filter(condition); // TODO: trace log query here? i think sea orm has a useful log level for this // set up pagination let page = get_page_from_params(¶ms)?; response.insert("page", serde_json::to_value(page).expect("can't fail")); // TODO: page size from param with a max from the config let page_size = 200; response.insert( "page_size", serde_json::to_value(page_size).expect("can't fail"), ); // query the database let aggregate = q .into_json() .paginate(&db_conn, page_size) .fetch_page(page) // TODO: timeouts here? or are they already set up on the connection .await?; // add the query response to the response response.insert("aggregate", serde_json::Value::Array(aggregate)); Ok(Json(response).into_response()) }