will continue mtr

This commit is contained in:
yenicelik 2023-03-20 22:00:49 +01:00
parent 0a6ccf28b5
commit fbe97c12b2
2 changed files with 97 additions and 25 deletions

@ -1,12 +1,14 @@
use anyhow::Context;
use argh::FromArgs;
use entities::user;
use entities::{rpc_accounting, rpc_accounting_v2, user};
use ethers::types::Address;
use log::{debug, info};
use hashbrown::HashMap;
use log::{debug, info, warn};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
QueryFilter, QuerySelect
};
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
/// change a user's address.
#[derive(FromArgs, PartialEq, Eq, Debug)]
@ -16,9 +18,77 @@ pub struct MigrateStatsToV2 {}
impl MigrateStatsToV2 {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
// (1) Load a batch of rows out of the old table
while true {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?;
if old_records.len() == 0 {
// Break out of while loop once all records have successfully been migrated ...
warn!("All records seem to have been successfully migrated!");
break;
}
// (2) Create request metadata objects to match the old data
let mut global_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
// Iterate through all old rows, and put them into the above objects.
for x in old_records {
// For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object
let key = RpcQueryKey {
response_timestamp: x.period_datetime.timestamp(),
archive_needed: x.archive_needed,
error_response: x.error_response,
period_datetime: x.period_datetime.timestamp(),
rpc_secret_key_id: x.rpc_key_id,
origin: x.origin,
method: x.method
};
// Create the corresponding BufferedRpcQueryStats object
let val = BufferedRpcQueryStats {
frontend_requests: x.frontend_requests,
backend_requests: x.backend_requests,
backend_retries: x.backend_retries,
no_servers: x.no_servers,
cache_misses: x.cache_misses,
cache_hits: x.cache_hits,
sum_request_bytes: x.sum_request_bytes,
sum_response_bytes: x.sum_response_bytes,
sum_response_millis: x.sum_response_millis
};
// TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of!
// We can generate dummy bytes of the same length though, this may work as well
// TODO: Period datetime is also a question of what it is
// let response_stat = RpcQueryStats::new(
// x.method,
// authorization.clone(),
// request_metadata.clone(),
// response_bytes,
// x.period_datetime
// );
// BufferedRpcQueryStats
}
}
// (2) Create request metadata objects to match the old data
// (3) Update the batch in the old table with the current timestamp

@ -30,23 +30,24 @@ pub enum StatType {
Detailed,
}
// Pub is needed for migration ... I could also write a second constructor for this if needed
/// TODO: better name?
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
authorization: Arc<Authorization>,
method: String,
archive_request: bool,
error_response: bool,
request_bytes: u64,
pub authorization: Arc<Authorization>,
pub method: String,
pub archive_request: bool,
pub error_response: bool,
pub request_bytes: u64,
/// if backend_requests is 0, there was a cache_hit
backend_requests: u64,
response_bytes: u64,
response_millis: u64,
response_timestamp: i64,
pub backend_requests: u64,
pub response_bytes: u64,
pub response_millis: u64,
pub response_timestamp: i64,
}
#[derive(Clone, From, Hash, PartialEq, Eq)]
struct RpcQueryKey {
pub struct RpcQueryKey {
/// unix epoch time
/// for the time series db, this is (close to) the time that the response was sent
/// for the account database, this is rounded to the week
@ -167,15 +168,15 @@ impl RpcQueryStats {
#[derive(Default)]
pub struct BufferedRpcQueryStats {
frontend_requests: u64,
backend_requests: u64,
backend_retries: u64,
no_servers: u64,
cache_misses: u64,
cache_hits: u64,
sum_request_bytes: u64,
sum_response_bytes: u64,
sum_response_millis: u64,
pub frontend_requests: u64,
pub backend_requests: u64,
pub backend_retries: u64,
pub no_servers: u64,
pub cache_misses: u64,
pub cache_hits: u64,
pub sum_request_bytes: u64,
pub sum_response_bytes: u64,
pub sum_response_millis: u64,
}
/// A stat that we aggregate and then store in a database.
@ -363,7 +364,7 @@ impl RpcQueryStats {
method: String,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response_bytes: usize,
response_bytes: usize
) -> Self {
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
@ -389,6 +390,7 @@ impl RpcQueryStats {
response_timestamp,
}
}
}
impl StatBuffer {