will create a PR for some feedback

This commit is contained in:
yenicelik 2023-03-25 19:26:23 +01:00
parent 53c7541fed
commit cef26e66cc
2 changed files with 7 additions and 24 deletions

@ -116,8 +116,6 @@ impl MigrateStatsToV2 {
None None
}; };
info!("Background handles are: {:?}", important_background_handles);
// Basically spawn the full app, look at web3_proxy CLI // Basically spawn the full app, look at web3_proxy CLI
while true { while true {
@ -134,10 +132,6 @@ impl MigrateStatsToV2 {
} }
// (2) Create request metadata objects to match the old data // (2) Create request metadata objects to match the old data
let mut global_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
// Iterate through all old rows, and put them into the above objects. // Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() { for x in old_records.iter() {
info!("Preparing for migration: {:?}", x); info!("Preparing for migration: {:?}", x);
@ -212,6 +206,7 @@ impl MigrateStatsToV2 {
// (3) Send through a channel to a stat emitter // (3) Send through a channel to a stat emitter
// Send it to the stats sender // Send it to the stats sender
if let Some(stat_sender_ref) = stat_sender.as_ref() { if let Some(stat_sender_ref) = stat_sender.as_ref() {
info!("Method is: {:?}", x.clone().method.unwrap());
let mut response_stat = RpcQueryStats::new( let mut response_stat = RpcQueryStats::new(
x.clone().method.unwrap(), x.clone().method.unwrap(),
authorization.clone(), authorization.clone(),
@ -254,6 +249,8 @@ impl MigrateStatsToV2 {
// If the items are in rpc_v2, delete the initial items from the database // If the items are in rpc_v2, delete the initial items from the database
// return Ok(());
// (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated)
let old_record_ids = old_records.iter().map(|x| x.id); let old_record_ids = old_records.iter().map(|x| x.id);
let update_result: UpdateResult = rpc_accounting::Entity::update_many() let update_result: UpdateResult = rpc_accounting::Entity::update_many()

@ -364,7 +364,7 @@ impl RpcQueryStats {
method: String, method: String,
authorization: Arc<Authorization>, authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>, metadata: Arc<RequestMetadata>,
response_bytes: usize response_bytes: usize,
) -> Self { ) -> Self {
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
@ -396,13 +396,12 @@ impl RpcQueryStats {
&mut self, &mut self,
response_millis: u64, response_millis: u64,
response_timestamp: i64, response_timestamp: i64,
backend_requests: u64 backend_requests: u64,
) { ) {
self.response_millis = response_millis; self.response_millis = response_millis;
self.response_timestamp = response_timestamp; self.response_timestamp = response_timestamp;
self.backend_requests = backend_requests; self.backend_requests = backend_requests;
} }
} }
impl StatBuffer { impl StatBuffer {
@ -446,7 +445,6 @@ impl StatBuffer {
stat_receiver: flume::Receiver<AppStat>, stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>, mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Aggregate and save loop is running");
let mut tsdb_save_interval = let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
let mut db_save_interval = let mut db_save_interval =
@ -561,13 +559,7 @@ impl StatBuffer {
for (key, stat) in global_timeseries_buffer.drain() { for (key, stat) in global_timeseries_buffer.drain() {
if let Err(err) = stat if let Err(err) = stat
.save_timeseries( .save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key)
&bucket,
"global_proxy",
self.chain_id,
influxdb_client,
key,
)
.await .await
{ {
error!( error!(
@ -584,13 +576,7 @@ impl StatBuffer {
for (key, stat) in opt_in_timeseries_buffer.drain() { for (key, stat) in opt_in_timeseries_buffer.drain() {
if let Err(err) = stat if let Err(err) = stat
.save_timeseries( .save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key)
&bucket,
"opt_in_proxy",
self.chain_id,
influxdb_client,
key,
)
.await .await
{ {
error!( error!(