2022-11-08 22:58:11 +03:00
|
|
|
use crate::frontend::authorization::{Authorization, RequestMetadata};
|
2022-11-10 02:58:07 +03:00
|
|
|
use axum::headers::Origin;
|
2022-10-10 08:35:25 +03:00
|
|
|
use chrono::{TimeZone, Utc};
|
2022-10-03 23:02:05 +03:00
|
|
|
use derive_more::From;
|
2022-10-10 07:15:07 +03:00
|
|
|
use entities::rpc_accounting;
|
2022-12-12 07:39:54 +03:00
|
|
|
use entities::sea_orm_active_enums::LogLevel;
|
2022-11-03 02:14:16 +03:00
|
|
|
use hashbrown::HashMap;
|
|
|
|
use hdrhistogram::{Histogram, RecordError};
|
2022-11-12 11:24:32 +03:00
|
|
|
use log::{error, info};
|
2022-11-14 21:24:52 +03:00
|
|
|
use migration::sea_orm::{self, ActiveModelTrait, DatabaseConnection, DbErr};
|
2022-11-10 02:58:07 +03:00
|
|
|
use std::num::NonZeroU64;
|
2022-11-03 02:14:16 +03:00
|
|
|
use std::sync::atomic::Ordering;
|
2022-10-10 07:15:07 +03:00
|
|
|
use std::sync::Arc;
|
2022-11-03 02:14:16 +03:00
|
|
|
use std::time::{Duration, SystemTime};
|
|
|
|
use tokio::sync::broadcast;
|
2022-10-03 21:08:01 +03:00
|
|
|
use tokio::task::JoinHandle;
|
2022-11-03 02:14:16 +03:00
|
|
|
use tokio::time::{interval_at, Instant};
|
2022-10-03 23:02:05 +03:00
|
|
|
|
2022-10-10 07:15:07 +03:00
|
|
|
/// TODO: where should this be defined?
|
|
|
|
/// TODO: can we use something inside sea_orm instead?
|
2022-10-03 23:02:05 +03:00
|
|
|
#[derive(Debug)]
|
2022-10-10 07:15:07 +03:00
|
|
|
pub struct ProxyResponseStat {
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-10-10 07:15:07 +03:00
|
|
|
method: String,
|
2022-11-03 02:14:16 +03:00
|
|
|
archive_request: bool,
|
2022-11-08 22:58:11 +03:00
|
|
|
error_response: bool,
|
2022-10-11 22:58:25 +03:00
|
|
|
request_bytes: u64,
|
2022-11-03 02:14:16 +03:00
|
|
|
/// if backend_requests is 0, there was a cache_hit
|
2022-10-25 06:41:59 +03:00
|
|
|
backend_requests: u64,
|
2022-10-11 22:58:25 +03:00
|
|
|
response_bytes: u64,
|
|
|
|
response_millis: u64,
|
2022-10-03 23:02:05 +03:00
|
|
|
}
|
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
impl ProxyResponseStat {
|
|
|
|
/// TODO: think more about this. probably rename it
|
|
|
|
fn key(&self) -> ProxyResponseAggregateKey {
|
2022-11-10 02:58:07 +03:00
|
|
|
// include either the rpc_key_id or the origin
|
2022-12-12 07:39:54 +03:00
|
|
|
let (mut rpc_key_id, origin) = match (
|
2022-11-10 02:58:07 +03:00
|
|
|
self.authorization.checks.rpc_key_id,
|
|
|
|
&self.authorization.origin,
|
|
|
|
) {
|
|
|
|
(Some(rpc_key_id), _) => {
|
|
|
|
// TODO: allow the user to opt into saving the origin
|
|
|
|
(Some(rpc_key_id), None)
|
|
|
|
}
|
|
|
|
(None, Some(origin)) => {
|
|
|
|
// we save the origin for anonymous access
|
|
|
|
(None, Some(origin.clone()))
|
|
|
|
}
|
|
|
|
(None, None) => {
|
|
|
|
// TODO: what should we do here? log ip? i really don't want to save any ips
|
|
|
|
(None, None)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-12-12 07:39:54 +03:00
|
|
|
let method = match self.authorization.checks.log_level {
|
|
|
|
LogLevel::None => {
|
|
|
|
// No rpc_key logging. Only save fully anonymized metric
|
|
|
|
rpc_key_id = None;
|
|
|
|
// keep the method since the rpc key is not attached
|
|
|
|
Some(self.method.clone())
|
|
|
|
}
|
2022-12-12 22:00:15 +03:00
|
|
|
LogLevel::Aggregated => {
|
2022-12-12 07:39:54 +03:00
|
|
|
// Lose the method
|
|
|
|
None
|
|
|
|
}
|
|
|
|
LogLevel::Detailed => {
|
|
|
|
// include the method
|
|
|
|
Some(self.method.clone())
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
ProxyResponseAggregateKey {
|
|
|
|
archive_request: self.archive_request,
|
|
|
|
error_response: self.error_response,
|
2022-12-12 07:39:54 +03:00
|
|
|
method,
|
2022-11-10 02:58:07 +03:00
|
|
|
origin,
|
|
|
|
rpc_key_id,
|
2022-11-08 22:58:11 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-11 20:34:25 +03:00
|
|
|
pub struct ProxyResponseHistograms {
|
|
|
|
request_bytes: Histogram<u64>,
|
|
|
|
response_bytes: Histogram<u64>,
|
|
|
|
response_millis: Histogram<u64>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for ProxyResponseHistograms {
|
|
|
|
fn default() -> Self {
|
|
|
|
// TODO: how many significant figures?
|
|
|
|
let request_bytes = Histogram::new(5).expect("creating request_bytes histogram");
|
|
|
|
let response_bytes = Histogram::new(5).expect("creating response_bytes histogram");
|
|
|
|
let response_millis = Histogram::new(5).expect("creating response_millis histogram");
|
|
|
|
|
|
|
|
Self {
|
|
|
|
request_bytes,
|
|
|
|
response_bytes,
|
|
|
|
response_millis,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
// TODO: think more about if we should include IP address in this
|
2022-11-03 02:14:16 +03:00
|
|
|
#[derive(Clone, From, Hash, PartialEq, Eq)]
|
|
|
|
struct ProxyResponseAggregateKey {
|
|
|
|
archive_request: bool,
|
2022-11-10 02:58:07 +03:00
|
|
|
error_response: bool,
|
|
|
|
rpc_key_id: Option<NonZeroU64>,
|
2022-12-12 07:39:54 +03:00
|
|
|
method: Option<String>,
|
2022-11-10 02:58:07 +03:00
|
|
|
/// TODO: should this be Origin or String?
|
|
|
|
origin: Option<Origin>,
|
2022-11-03 02:14:16 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct ProxyResponseAggregate {
|
|
|
|
frontend_requests: u64,
|
|
|
|
backend_requests: u64,
|
2022-11-08 22:58:11 +03:00
|
|
|
// TODO: related to backend_requests
|
2022-11-03 02:14:16 +03:00
|
|
|
// backend_retries: u64,
|
2022-11-08 22:58:11 +03:00
|
|
|
// TODO: related to backend_requests
|
2022-11-03 02:14:16 +03:00
|
|
|
// no_servers: u64,
|
|
|
|
cache_misses: u64,
|
|
|
|
cache_hits: u64,
|
|
|
|
sum_request_bytes: u64,
|
|
|
|
sum_response_bytes: u64,
|
|
|
|
sum_response_millis: u64,
|
|
|
|
histograms: ProxyResponseHistograms,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A stat that we aggregate and then store in a database.
|
|
|
|
/// For now there is just one, but I think there might be others later
|
|
|
|
#[derive(Debug, From)]
|
|
|
|
pub enum Web3ProxyStat {
|
|
|
|
Response(ProxyResponseStat),
|
2022-10-03 23:02:05 +03:00
|
|
|
}
|
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
#[derive(From)]
|
|
|
|
pub struct StatEmitterSpawn {
|
|
|
|
pub stat_sender: flume::Sender<Web3ProxyStat>,
|
|
|
|
/// these handles are important and must be allowed to finish
|
|
|
|
pub background_handle: JoinHandle<anyhow::Result<()>>,
|
|
|
|
}
|
2022-10-10 07:15:07 +03:00
|
|
|
|
|
|
|
pub struct StatEmitter {
|
|
|
|
chain_id: u64,
|
|
|
|
db_conn: DatabaseConnection,
|
|
|
|
period_seconds: u64,
|
2022-10-03 23:02:05 +03:00
|
|
|
}
|
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
// TODO: impl `+=<ProxyResponseStat>` for ProxyResponseAggregate?
|
|
|
|
impl ProxyResponseAggregate {
|
|
|
|
fn add(&mut self, stat: ProxyResponseStat) -> Result<(), RecordError> {
|
|
|
|
// a stat always come from just 1 frontend request
|
|
|
|
self.frontend_requests += 1;
|
|
|
|
|
|
|
|
if stat.backend_requests == 0 {
|
|
|
|
// no backend request. cache hit!
|
|
|
|
self.cache_hits += 1;
|
|
|
|
} else {
|
|
|
|
// backend requests! cache miss!
|
|
|
|
self.cache_misses += 1;
|
|
|
|
|
|
|
|
// a stat might have multiple backend requests
|
|
|
|
self.backend_requests += stat.backend_requests;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.sum_request_bytes += stat.request_bytes;
|
|
|
|
self.sum_response_bytes += stat.response_bytes;
|
|
|
|
self.sum_response_millis += stat.response_millis;
|
|
|
|
|
|
|
|
// TODO: use `record_correct`?
|
|
|
|
self.histograms.request_bytes.record(stat.request_bytes)?;
|
|
|
|
self.histograms
|
|
|
|
.response_millis
|
|
|
|
.record(stat.response_millis)?;
|
|
|
|
self.histograms.response_bytes.record(stat.response_bytes)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO? help to turn this plus the key into a database model?
|
|
|
|
// TODO: take a db transaction instead so that we can batch
|
|
|
|
async fn save(
|
|
|
|
self,
|
|
|
|
chain_id: u64,
|
|
|
|
db_conn: &DatabaseConnection,
|
|
|
|
key: ProxyResponseAggregateKey,
|
|
|
|
period_timestamp: u64,
|
|
|
|
) -> Result<(), DbErr> {
|
|
|
|
// this is a lot of variables
|
2022-11-12 12:25:14 +03:00
|
|
|
let period_datetime = Utc.timestamp_opt(period_timestamp as i64, 0).unwrap();
|
2022-11-03 02:14:16 +03:00
|
|
|
|
|
|
|
let request_bytes = &self.histograms.request_bytes;
|
|
|
|
|
|
|
|
let min_request_bytes = request_bytes.min();
|
|
|
|
let mean_request_bytes = request_bytes.mean();
|
|
|
|
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
|
|
|
|
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
|
|
|
|
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
|
|
|
|
let max_request_bytes = request_bytes.max();
|
|
|
|
|
|
|
|
let response_millis = &self.histograms.response_millis;
|
|
|
|
|
|
|
|
let min_response_millis = response_millis.min();
|
|
|
|
let mean_response_millis = response_millis.mean();
|
|
|
|
let p50_response_millis = response_millis.value_at_quantile(0.50);
|
|
|
|
let p90_response_millis = response_millis.value_at_quantile(0.90);
|
|
|
|
let p99_response_millis = response_millis.value_at_quantile(0.99);
|
|
|
|
let max_response_millis = response_millis.max();
|
|
|
|
|
|
|
|
let response_bytes = &self.histograms.response_bytes;
|
|
|
|
|
|
|
|
let min_response_bytes = response_bytes.min();
|
|
|
|
let mean_response_bytes = response_bytes.mean();
|
|
|
|
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
|
|
|
|
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
|
|
|
|
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
|
|
|
|
let max_response_bytes = response_bytes.max();
|
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
// TODO: Set origin and maybe other things on this model. probably not the ip though
|
2022-11-03 02:14:16 +03:00
|
|
|
let aggregated_stat_model = rpc_accounting::ActiveModel {
|
|
|
|
id: sea_orm::NotSet,
|
2022-11-08 22:58:11 +03:00
|
|
|
// origin: sea_orm::Set(key.authorization.origin.to_string()),
|
2022-11-10 02:58:07 +03:00
|
|
|
rpc_key_id: sea_orm::Set(key.rpc_key_id.map(Into::into)),
|
|
|
|
origin: sea_orm::Set(key.origin.map(|x| x.to_string())),
|
2022-11-03 02:14:16 +03:00
|
|
|
chain_id: sea_orm::Set(chain_id),
|
|
|
|
method: sea_orm::Set(key.method),
|
|
|
|
archive_request: sea_orm::Set(key.archive_request),
|
|
|
|
error_response: sea_orm::Set(key.error_response),
|
|
|
|
period_datetime: sea_orm::Set(period_datetime),
|
|
|
|
frontend_requests: sea_orm::Set(self.frontend_requests),
|
|
|
|
backend_requests: sea_orm::Set(self.backend_requests),
|
|
|
|
// backend_retries: sea_orm::Set(self.backend_retries),
|
|
|
|
// no_servers: sea_orm::Set(self.no_servers),
|
|
|
|
cache_misses: sea_orm::Set(self.cache_misses),
|
|
|
|
cache_hits: sea_orm::Set(self.cache_hits),
|
|
|
|
|
|
|
|
sum_request_bytes: sea_orm::Set(self.sum_request_bytes),
|
|
|
|
min_request_bytes: sea_orm::Set(min_request_bytes),
|
|
|
|
mean_request_bytes: sea_orm::Set(mean_request_bytes),
|
|
|
|
p50_request_bytes: sea_orm::Set(p50_request_bytes),
|
|
|
|
p90_request_bytes: sea_orm::Set(p90_request_bytes),
|
|
|
|
p99_request_bytes: sea_orm::Set(p99_request_bytes),
|
|
|
|
max_request_bytes: sea_orm::Set(max_request_bytes),
|
|
|
|
|
|
|
|
sum_response_millis: sea_orm::Set(self.sum_response_millis),
|
|
|
|
min_response_millis: sea_orm::Set(min_response_millis),
|
|
|
|
mean_response_millis: sea_orm::Set(mean_response_millis),
|
|
|
|
p50_response_millis: sea_orm::Set(p50_response_millis),
|
|
|
|
p90_response_millis: sea_orm::Set(p90_response_millis),
|
|
|
|
p99_response_millis: sea_orm::Set(p99_response_millis),
|
|
|
|
max_response_millis: sea_orm::Set(max_response_millis),
|
|
|
|
|
|
|
|
sum_response_bytes: sea_orm::Set(self.sum_response_bytes),
|
|
|
|
min_response_bytes: sea_orm::Set(min_response_bytes),
|
|
|
|
mean_response_bytes: sea_orm::Set(mean_response_bytes),
|
|
|
|
p50_response_bytes: sea_orm::Set(p50_response_bytes),
|
|
|
|
p90_response_bytes: sea_orm::Set(p90_response_bytes),
|
|
|
|
p99_response_bytes: sea_orm::Set(p99_response_bytes),
|
|
|
|
max_response_bytes: sea_orm::Set(max_response_bytes),
|
|
|
|
};
|
|
|
|
|
|
|
|
aggregated_stat_model.save(db_conn).await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
|
2022-10-10 07:15:07 +03:00
|
|
|
impl ProxyResponseStat {
|
2022-10-11 22:58:25 +03:00
|
|
|
pub fn new(
|
|
|
|
method: String,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-10-11 22:58:25 +03:00
|
|
|
metadata: Arc<RequestMetadata>,
|
2022-11-20 01:05:51 +03:00
|
|
|
response_bytes: usize,
|
2022-10-11 22:58:25 +03:00
|
|
|
) -> Self {
|
2022-11-03 02:14:16 +03:00
|
|
|
let archive_request = metadata.archive_request.load(Ordering::Acquire);
|
2022-12-20 02:59:01 +03:00
|
|
|
let backend_requests = metadata.backend_requests.lock().len() as u64;
|
2022-11-03 02:14:16 +03:00
|
|
|
// let period_seconds = metadata.period_seconds;
|
|
|
|
// let period_timestamp =
|
|
|
|
// (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds;
|
2022-10-11 22:58:25 +03:00
|
|
|
let request_bytes = metadata.request_bytes;
|
|
|
|
let error_response = metadata.error_response.load(Ordering::Acquire);
|
2022-10-11 20:34:25 +03:00
|
|
|
|
2022-10-21 02:50:06 +03:00
|
|
|
// TODO: timestamps could get confused by leap seconds. need tokio time instead
|
|
|
|
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
|
2022-10-12 00:31:34 +03:00
|
|
|
|
2022-11-20 01:05:51 +03:00
|
|
|
let response_bytes = response_bytes as u64;
|
|
|
|
|
2022-10-10 07:15:07 +03:00
|
|
|
Self {
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization,
|
2022-11-03 02:14:16 +03:00
|
|
|
archive_request,
|
2022-10-10 07:15:07 +03:00
|
|
|
method,
|
2022-10-11 22:58:25 +03:00
|
|
|
backend_requests,
|
|
|
|
request_bytes,
|
|
|
|
error_response,
|
|
|
|
response_bytes,
|
|
|
|
response_millis,
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StatEmitter {
|
2022-11-03 02:14:16 +03:00
|
|
|
pub fn spawn(
|
|
|
|
chain_id: u64,
|
|
|
|
db_conn: DatabaseConnection,
|
|
|
|
period_seconds: u64,
|
|
|
|
shutdown_receiver: broadcast::Receiver<()>,
|
|
|
|
) -> anyhow::Result<StatEmitterSpawn> {
|
|
|
|
let (stat_sender, stat_receiver) = flume::unbounded();
|
2022-10-10 07:15:07 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
let mut new = Self {
|
2022-10-10 07:15:07 +03:00
|
|
|
chain_id,
|
|
|
|
db_conn,
|
|
|
|
period_seconds,
|
|
|
|
};
|
|
|
|
|
2022-12-28 09:11:18 +03:00
|
|
|
// TODO: send any errors somewhere
|
2022-11-03 02:14:16 +03:00
|
|
|
let handle =
|
|
|
|
tokio::spawn(async move { new.stat_loop(stat_receiver, shutdown_receiver).await });
|
2022-10-10 07:15:07 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
Ok((stat_sender, handle).into())
|
2022-10-11 08:13:00 +03:00
|
|
|
}
|
2022-10-07 05:15:53 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
async fn stat_loop(
|
|
|
|
&mut self,
|
|
|
|
stat_receiver: flume::Receiver<Web3ProxyStat>,
|
2022-10-21 01:51:56 +03:00
|
|
|
mut shutdown_receiver: broadcast::Receiver<()>,
|
2022-10-11 08:13:00 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-11-03 02:14:16 +03:00
|
|
|
let system_now = SystemTime::now();
|
2022-10-21 02:50:06 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
let duration_since_epoch = system_now
|
|
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
|
|
.expect("time machines don't exist");
|
2022-10-21 02:50:06 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
// TODO: change period_seconds from u64 to u32
|
|
|
|
let current_period = duration_since_epoch
|
|
|
|
.checked_div(self.period_seconds as u32)
|
|
|
|
.unwrap()
|
|
|
|
* self.period_seconds as u32;
|
|
|
|
|
|
|
|
let duration_to_next_period =
|
|
|
|
Duration::from_secs(self.period_seconds) - (duration_since_epoch - current_period);
|
|
|
|
|
|
|
|
// start the interval when the next period starts
|
|
|
|
let start_instant = Instant::now() + duration_to_next_period;
|
|
|
|
let mut interval = interval_at(start_instant, Duration::from_secs(self.period_seconds));
|
|
|
|
|
|
|
|
// loop between different futures to update these mutables
|
|
|
|
let mut period_timestamp = current_period.as_secs();
|
|
|
|
let mut response_aggregate_map =
|
|
|
|
HashMap::<ProxyResponseAggregateKey, ProxyResponseAggregate>::new();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
stat = stat_receiver.recv_async() => {
|
|
|
|
match stat? {
|
|
|
|
Web3ProxyStat::Response(stat) => {
|
|
|
|
let key = stat.key();
|
|
|
|
|
|
|
|
// TODO: does hashmap have get_or_insert?
|
|
|
|
if ! response_aggregate_map.contains_key(&key) {
|
|
|
|
response_aggregate_map.insert(key.clone(), Default::default());
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(value) = response_aggregate_map.get_mut(&key) {
|
|
|
|
if let Err(err) = value.add(stat) {
|
2022-11-12 11:24:32 +03:00
|
|
|
error!( "unable to aggregate stats! err={:?}", err);
|
2022-11-03 02:14:16 +03:00
|
|
|
};
|
|
|
|
} else {
|
|
|
|
unimplemented!();
|
|
|
|
}
|
2022-10-21 02:50:06 +03:00
|
|
|
}
|
2022-10-21 01:51:56 +03:00
|
|
|
}
|
|
|
|
}
|
2022-11-03 02:14:16 +03:00
|
|
|
_ = interval.tick() => {
|
|
|
|
// save all the aggregated stats
|
|
|
|
// TODO: batch these saves
|
|
|
|
for (key, aggregate) in response_aggregate_map.drain() {
|
|
|
|
if let Err(err) = aggregate.save(self.chain_id, &self.db_conn, key, period_timestamp).await {
|
2022-11-12 11:24:32 +03:00
|
|
|
error!("Unable to save stat while shutting down! {:?}", err);
|
2022-11-03 02:14:16 +03:00
|
|
|
};
|
|
|
|
}
|
|
|
|
// advance to the next period
|
|
|
|
// TODO: is this safe? what if there is drift?
|
|
|
|
period_timestamp += self.period_seconds;
|
|
|
|
}
|
2022-10-21 02:50:06 +03:00
|
|
|
x = shutdown_receiver.recv() => {
|
|
|
|
match x {
|
2022-11-03 02:14:16 +03:00
|
|
|
Ok(_) => {
|
|
|
|
info!("aggregate stat_loop shutting down");
|
|
|
|
// TODO: call aggregate_stat for all the
|
|
|
|
},
|
2022-11-12 11:24:32 +03:00
|
|
|
Err(err) => error!("shutdown receiver. err={:?}", err),
|
2022-10-21 02:50:06 +03:00
|
|
|
}
|
|
|
|
break;
|
2022-10-21 01:51:56 +03:00
|
|
|
}
|
|
|
|
}
|
2022-10-11 08:13:00 +03:00
|
|
|
}
|
2022-10-10 08:35:25 +03:00
|
|
|
|
2022-11-04 07:32:09 +03:00
|
|
|
info!("saving {} pending stats", response_aggregate_map.len());
|
2022-11-04 06:40:43 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
for (key, aggregate) in response_aggregate_map.drain() {
|
|
|
|
if let Err(err) = aggregate
|
|
|
|
.save(self.chain_id, &self.db_conn, key, period_timestamp)
|
|
|
|
.await
|
|
|
|
{
|
2022-11-12 11:24:32 +03:00
|
|
|
error!("Unable to save stat while shutting down! err={:?}", err);
|
2022-11-03 02:14:16 +03:00
|
|
|
};
|
2022-10-11 08:13:00 +03:00
|
|
|
}
|
2022-10-03 21:08:01 +03:00
|
|
|
|
2022-11-03 02:14:16 +03:00
|
|
|
info!("aggregated stat_loop shut down");
|
2022-10-10 07:15:07 +03:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|