From 9beddc43a440abde13dffc4a568861e227db0f08 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 21 Mar 2023 10:11:35 +0100 Subject: [PATCH] will get and inser some migration data --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 83 ++++---- web3_proxy/src/stats/db_queries.rs | 199 ------------------ 2 files changed, 38 insertions(+), 244 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 12c70ade..88d19dde 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -36,54 +36,47 @@ impl MigrateStatsToV2 { // Iterate through all old rows, and put them into the above objects. for x in old_records { - // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object - let key = RpcQueryKey { - response_timestamp: x.period_datetime.timestamp(), - archive_needed: x.archive_needed, - error_response: x.error_response, - period_datetime: x.period_datetime.timestamp(), - rpc_secret_key_id: x.rpc_key_id, - origin: x.origin, - method: x.method - }; - - // Create the corresponding BufferedRpcQueryStats object - let val = BufferedRpcQueryStats { - frontend_requests: x.frontend_requests, - backend_requests: x.backend_requests, - backend_retries: x.backend_retries, - no_servers: x.no_servers, - cache_misses: x.cache_misses, - cache_hits: x.cache_hits, - sum_request_bytes: x.sum_request_bytes, - sum_response_bytes: x.sum_response_bytes, - sum_response_millis: x.sum_response_millis - }; - - // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! - // We can generate dummy bytes of the same length though, this may work as well - - // TODO: Period datetime is also a question of what it is - // let response_stat = RpcQueryStats::new( - // x.method, - // authorization.clone(), - // request_metadata.clone(), - // response_bytes, - // x.period_datetime - // ); - - // BufferedRpcQueryStats - - - + info!("Preparing for migration: {:?}", x); + // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object + // let key = RpcQueryKey { + // response_timestamp: x.period_datetime.timestamp(), + // archive_needed: x.archive_needed, + // error_response: x.error_response, + // period_datetime: x.period_datetime.timestamp(), + // rpc_secret_key_id: x.rpc_key_id, + // origin: x.origin, + // method: x.method + // }; + // + // // Create the corresponding BufferedRpcQueryStats object + // let val = BufferedRpcQueryStats { + // frontend_requests: x.frontend_requests, + // backend_requests: x.backend_requests, + // backend_retries: x.backend_retries, + // no_servers: x.no_servers, + // cache_misses: x.cache_misses, + // cache_hits: x.cache_hits, + // sum_request_bytes: x.sum_request_bytes, + // sum_response_bytes: x.sum_response_bytes, + // sum_response_millis: x.sum_response_millis + // }; + // + // // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! + // // We can generate dummy bytes of the same length though, this may work as well + // + // // TODO: Period datetime is also a question of what it is + // // let response_stat = RpcQueryStats::new( + // // x.method, + // // authorization.clone(), + // // request_metadata.clone(), + // // response_bytes, + // // x.period_datetime + // // ); + // + // // BufferedRpcQueryStats } - - - - - } diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 599b3cff..bd6655de 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -22,206 +22,7 @@ use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis; use redis_rate_limiter::redis::AsyncCommands; use serde_json::json; - -// <<<<<<< HEAD:web3_proxy/src/user_queries.rs -// /// get the attached address for the given bearer token. -// /// First checks redis. Then checks the database. -// /// 0 means all users. -// /// This authenticates that the bearer is allowed to view this user_id's stats -// pub async fn get_user_id_from_params( -// redis_conn: &mut RedisConnection, -// db_conn: &DatabaseConnection, -// db_replica: &DatabaseReplica, -// // this is a long type. should we strip it down? -// bearer: Option>>, -// params: &HashMap, -// ) -> Result { -// debug!("bearer and params are: {:?} {:?}", bearer, params); -// match (bearer, params.get("user_id")) { -// (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { -// // check for the bearer cache key -// let user_bearer_token = UserBearerToken::try_from(bearer)?; -// -// let user_redis_key = user_bearer_token.redis_key(); -// -// let mut save_to_redis = false; -// -// // get the user id that is attached to this bearer token -// let bearer_user_id = match redis_conn.get::<_, u64>(&user_redis_key).await { -// Err(_) => { -// // TODO: inspect the redis error? if redis is down we should warn -// // this also means redis being down will not kill our app. Everything will need a db read query though. -// -// let user_login = login::Entity::find() -// .filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) -// .one(db_replica.conn()) -// .await -// .context("database error while querying for user")? -// .ok_or(FrontendErrorResponse::AccessDenied)?; -// -// // if expired, delete ALL expired logins -// let now = Utc::now(); -// if now > user_login.expires_at { -// // this row is expired! do not allow auth! -// // delete ALL expired logins. -// let delete_result = login::Entity::delete_many() -// .filter(login::Column::ExpiresAt.lte(now)) -// .exec(db_conn) -// .await?; -// -// // TODO: emit a stat? if this is high something weird might be happening -// debug!("cleared expired logins: {:?}", delete_result); -// -// return Err(FrontendErrorResponse::AccessDenied); -// } -// -// save_to_redis = true; -// -// user_login.user_id -// } -// Ok(x) => { -// // TODO: push cache ttl further in the future? -// x -// } -// }; -// -// let user_id: u64 = user_id.parse().context("Parsing user_id param")?; -// -// if bearer_user_id != user_id { -// return Err(FrontendErrorResponse::AccessDenied); -// } -// -// if save_to_redis { -// // TODO: how long? we store in database for 4 weeks -// const ONE_DAY: usize = 60 * 60 * 24; -// -// if let Err(err) = redis_conn -// .set_ex::<_, _, ()>(user_redis_key, user_id, ONE_DAY) -// .await -// { -// warn!("Unable to save user bearer token to redis: {}", err) -// } -// } -// -// Ok(bearer_user_id) -// } -// (_, None) => { -// // they have a bearer token. we don't care about it on public pages -// // 0 means all -// Ok(0) -// } -// (None, Some(_)) => { -// // they do not have a bearer token, but requested a specific id. block -// // TODO: proper error code from a useful error code -// // TODO: maybe instead of this sharp edged warn, we have a config value? -// // TODO: check config for if we should deny or allow this -// Err(FrontendErrorResponse::AccessDenied) -// // // TODO: make this a flag -// // warn!("allowing without auth during development!"); -// // Ok(x.parse()?) -// } -// } -// } -// -// /// only allow rpc_key to be set if user_id is also set. -// /// this will keep people from reading someone else's keys. -// /// 0 means none. -// -// pub fn get_rpc_key_id_from_params( -// user_id: u64, -// params: &HashMap, -// ) -> anyhow::Result { -// if user_id > 0 { -// params.get("rpc_key_id").map_or_else( -// || Ok(0), -// |c| { -// let c = c.parse()?; -// -// Ok(c) -// }, -// ) -// } else { -// Ok(0) -// } -// } -// -// pub fn get_chain_id_from_params( -// app: &Web3ProxyApp, -// params: &HashMap, -// ) -> anyhow::Result { -// params.get("chain_id").map_or_else( -// || Ok(app.config.chain_id), -// |c| { -// let c = c.parse()?; -// -// Ok(c) -// }, -// ) -// } -// -// pub fn get_query_start_from_params( -// params: &HashMap, -// ) -> anyhow::Result { -// params.get("query_start").map_or_else( -// || { -// // no timestamp in params. set default -// let x = chrono::Utc::now() - chrono::Duration::days(30); -// -// Ok(x.naive_utc()) -// }, -// |x: &String| { -// // parse the given timestamp -// let x = x.parse::().context("parsing timestamp query param")?; -// -// // TODO: error code 401 -// let x = -// NaiveDateTime::from_timestamp_opt(x, 0).context("parsing timestamp query param")?; -// -// Ok(x) -// }, -// ) -// } -// -// pub fn get_page_from_params(params: &HashMap) -> anyhow::Result { -// params.get("page").map_or_else::, _, _>( -// || { -// // no page in params. set default -// Ok(0) -// }, -// |x: &String| { -// // parse the given timestamp -// // TODO: error code 401 -// let x = x.parse().context("parsing page query from params")?; -// -// Ok(x) -// }, -// ) -// } -// -// pub fn get_query_window_seconds_from_params( -// params: &HashMap, -// ) -> Result { -// params.get("query_window_seconds").map_or_else( -// || { -// // no page in params. set default -// Ok(0) -// }, -// |query_window_seconds: &String| { -// // parse the given timestamp -// // TODO: error code 401 -// query_window_seconds.parse::().map_err(|e| { -// FrontendErrorResponse::StatusCode( -// StatusCode::BAD_REQUEST, -// "Unable to parse rpc_key_id".to_string(), -// Some(e.into()), -// ) -// }) -// }, -// ) -// } -// ======= use super::StatType; -// >>>>>>> 77df3fa (stats v2):web3_proxy/src/stats/db_queries.rs pub fn filter_query_window_seconds( query_window_seconds: u64,