seems to wait until everything is flushed. I will have to debug some stuff that isnt saved properly in the mysql (and then check influx)

This commit is contained in:
yenicelik 2023-03-25 17:56:45 +01:00
parent 1d72a3cd44
commit 53c7541fed
5 changed files with 336 additions and 148 deletions

@ -67,7 +67,7 @@ pub static APP_USER_AGENT: &str = concat!(
// aggregate across 1 week
const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
#[derive(Debug, From)]
struct ResponseCacheKey {
@ -575,7 +575,12 @@ impl Web3ProxyApp {
// stats can be saved in mysql, influxdb, both, or none
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(,"No influxdb bucket was provided")?.to_owned(),
.context("No influxdb bucket was provided")?
@ -830,8 +835,9 @@ impl Web3ProxyApp {
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {

@ -374,12 +374,17 @@ fn main() -> anyhow::Result<()> {
SubCommand::MigrateStatsToV2(x) => {
let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx");
// let top_config_path =
// top_config_path.expect("path must be set if top_config exists");
let db_url = cli_config
.expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx");
let db_conn = get_db(db_url, 1, 1).await?;
x.main(top_config, &db_conn).await
SubCommand::Pagerduty(x) => {
if cli_config.sentry_url.is_none() {

@ -1,18 +1,49 @@
use std::net::{IpAddr, Ipv4Addr};
use std::num::NonZeroU64;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user};
use chrono::{DateTime, Utc};
use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user};
use ethers::types::Address;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{debug, info, warn};
use log::{debug, error, info, trace, warn};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect
QueryFilter, QuerySelect, UpdateResult,
use web3_proxy::app::AuthorizationChecks;
use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata, RpcSecretKey};
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
use migration::{Expr, Value};
use std::mem::swap;
use std::net::{IpAddr, Ipv4Addr};
use std::num::NonZeroU64;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
use tokio::time::{sleep, Instant};
use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS};
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{
Authorization, AuthorizationType, RequestMetadata, RpcSecretKey,
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer};
// Helper function to go from DateTime to Instant
fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
let epoch = datetime.timestamp(); // Get the Unix timestamp
let nanos = datetime.timestamp_subsec_nanos();
let duration_since_epoch = Duration::new(epoch as u64, nanos);
// let duration_since_datetime = Duration::new(, nanos);
let instant_new = Instant::now();
warn!("Instant new is: {:?}", instant_new);
let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
warn!("Instant since unix epoch is: {:?}", unix_epoch);
.context("Could not subtract unix epoch from instant now")?
.context("Could not add duration since epoch for updated time")
/// change a user's address.
#[derive(FromArgs, PartialEq, Eq, Debug)]
@ -20,14 +51,80 @@ use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
pub struct MigrateStatsToV2 {}
impl MigrateStatsToV2 {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
pub async fn main(
top_config: TopConfig,
db_conn: &DatabaseConnection,
) -> anyhow::Result<()> {
// Also add influxdb container ...
// let mut spawned_app =
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
// we wouldn't really need this, but let's spawn this anyways
// easier than debugging the rest I suppose
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let mut important_background_handles = FuturesUnordered::new();
// Spawn the influxdb
let influxdb_client = match {
Some(influxdb_host) => {
let influxdb_org = top_config
.expect("influxdb_org needed when influxdb_host is set");
let influxdb_token = top_config
.expect("influxdb_token needed when influxdb_host is set");
let influxdb_client =
influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token);
// TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys
None => None,
// Spawn the stat-sender
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(,
.context("No influxdb bucket was provided")?
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
} else {
info!("Background handles are: {:?}", important_background_handles);
// Basically spawn the full app, look at web3_proxy CLI
while true {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find()
if old_records.len() == 0 {
@ -42,8 +139,7 @@ impl MigrateStatsToV2 {
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
// Iterate through all old rows, and put them into the above objects.
for x in old_records {
for x in old_records.iter() {
info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
@ -57,103 +153,163 @@ impl MigrateStatsToV2 {
.context("Could not find rpc_key_obj for the given rpc_key_id")?;
// TODO: Create authrization
// We can probably also randomly generate this, as we don't care about the user (?)
AuthorizationChecks {
user_id: rpc_key_obj.user_id,
rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)),
rpc_secret_key_id: Some(NonZeroU64::new(rpc_key_id).unwrap()),
rpc_secret_key_id: Some(
.context("Could not use rpc_key_id to create a u64")?,
None => AuthorizationChecks {
None => {
AuthorizationChecks {
// Then overwrite rpc_key_id and user_id (?)
let authorization_type = AuthorizationType::Frontend;
let authorization = Authorization::try_new(
let authorization = Arc::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
.context("Initializing Authorization Struct was not successful")?,
// It will be like a fork basically (to simulate getting multiple single requests ...)
// Iterate through all frontend requests
// For each frontend request, create one object that will be emitted (make sure the timestamp is new)
let n = x.frontend_requests;
for _ in 1..x.frontend_requests {
info!("Creating a new frontend request");
for _ in 1..n {
// info!("Creating a new frontend request");
// TODO: Create RequestMetadata
let request_metadata = RequestMetadata {
start_instant: x.period_datetime.timestamp(),
request_bytes: x.request_bytes,
archive_request: x.archive_request,
backend_requests: todo!(),
no_servers: 0, // This is not relevant in the new version
error_response: x.error_response,
response_bytes: todo!(),
response_millis: todo!(),
response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default()
start_instant: Instant::now(), // This is overwritten later on
request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes
archive_request: x.archive_request.into(),
backend_requests: Default::default(), // This is not used, instead we modify the field later
no_servers: 0.into(), // This is not relevant in the new version
error_response: x.error_response.into(),
response_bytes: (x.sum_response_bytes / n).into(),
response_millis: (x.sum_response_millis / n).into(),
// We just don't have this data
response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default()
// (3) Send through a channel to a stat emitter
// Send it to the stats sender
if let Some(stat_sender_ref) = stat_sender.as_ref() {
let mut response_stat = RpcQueryStats::new(
(x.sum_response_bytes / n)
.context("sum bytes average is not calculated properly")?,
// Modify the timestamps ..
(x.sum_response_millis / n),
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database
(x.backend_requests / n),
info!("Sending stats: {:?}", response_stat);
// .send(response_stat.into())
.context("stat_sender sending response_stat")?;
info!("Send! {:?}", stat_sender);
} else {
panic!("Stat sender was not spawned!");
// Create a new stats object
// Add it to the StatBuffer
// Let the stat_sender spawn / flush ...
// Send a signal to save ...
// (3) Await that all items are properly processed
// TODO: Await all the background handles
info!("Waiting for a second until all is flushed");
// Only after this mark all the items as processed / completed
// If the items are in rpc_v2, delete the initial items from the database
// (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated)
let old_record_ids = old_records.iter().map(|x|;
let update_result: UpdateResult = rpc_accounting::Entity::update_many()
// .set(pear)
info!("Update result is: {:?}", update_result);
// (N-1) Mark the batch as migrated
"Background handles (2) are: {:?}",
// Drop the handle
// Send the shutdown signal here (?)
// important_background_handles.clear();
// Finally also send a shutdown signal
match app_shutdown_sender.send(()) {
Err(x) => {
panic!("Could not send shutdown signal! {:?}", x);
_ => {}
// Drop the background handle
while let Some(x) = {
info!("Returned item is: {:?}", x);
match x {
Err(e) => {
error!("{:?}", e);
Ok(Err(e)) => {
error!("{:?}", e);
Ok(Ok(_)) => {
// TODO: how can we know which handle exited?
info!("a background handle exited");
// Pop it in this case?
// (3) Update the batch in the old table with the current timestamp
// (4) Send through a channel to a stat emitter
// let old_address: Address = self.old_address.parse()?;
// let new_address: Address = self.new_address.parse()?;
// let old_address: Vec<u8> = old_address.to_fixed_bytes().into();
// let new_address: Vec<u8> = new_address.to_fixed_bytes().into();
// let u = user::Entity::find()
// .filter(user::Column::Address.eq(old_address))
// .one(db_conn)
// .await?
// .context("No user found with that address")?;
// debug!("initial user: {:#?}", u);
// if u.address == new_address {
// info!("user already has this address");
// } else {
// let mut u = u.into_active_model();
// u.address = sea_orm::Set(new_address);
// let u =;
// info!("changed user address");
// debug!("updated user: {:#?}", u);
// }
info!("Here (?)");

@ -34,6 +34,7 @@ use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom;
use tokio;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -167,7 +168,8 @@ impl Web3Rpcs {
let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
// by_name starts empty. self.apply_server_configs will add to it
let by_name = Default::default();
@ -395,7 +397,7 @@ impl Web3Rpcs {
let clone = self.clone();
let authorization = authorization.clone();
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = task::spawn(async move {
let handle = tokio::task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
let f = clone.clone().process_incoming_tx_id(
@ -418,7 +420,7 @@ impl Web3Rpcs {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = task::Builder::default()
let handle = tokio::task::Builder::default()
.spawn(async move {
@ -432,7 +434,9 @@ impl Web3Rpcs {
if futures.is_empty() {
// no transaction or block subscriptions.
let handle = task::Builder::default().name("noop").spawn(async move {
let handle = tokio::task::Builder::default()
.spawn(async move {
loop {
// TODO: "every interval, check that the provider is still connected"
@ -1299,13 +1303,13 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;

@ -391,6 +391,18 @@ impl RpcQueryStats {
/// Only used for migration from stats_v1 to stats_v2/v3
pub fn modify_struct(
&mut self,
response_millis: u64,
response_timestamp: i64,
backend_requests: u64
) {
self.response_millis = response_millis;
self.response_timestamp = response_timestamp;
self.backend_requests = backend_requests;
impl StatBuffer {
@ -434,6 +446,7 @@ impl StatBuffer {
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
info!("Aggregate and save loop is running");
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval =
@ -450,6 +463,7 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
info!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
@ -476,6 +490,7 @@ impl StatBuffer {
_ = db_save_interval.tick() => {
info!("DB save internal tick");
let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats");
// TODO: batch saves
@ -487,6 +502,7 @@ impl StatBuffer {
_ = tsdb_save_interval.tick() => {
info!("TSDB save internal tick");
// TODO: batch saves
// TODO: better bucket names
let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");
@ -506,6 +522,7 @@ impl StatBuffer {
x = shutdown_receiver.recv() => {
info!("shutdown signal ---");
match x {
Ok(_) => {
info!("stat_loop shutting down");