Merge pull request #4 from yenicelik/david/migration-cli

#37 Migrate from rpc_accounting to influxdb
This commit is contained in:
David 2023-03-30 12:55:12 +01:00 committed by GitHub
commit 20af16ee21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 444 additions and 183 deletions

@ -40,6 +40,7 @@ pub struct Model {
pub max_response_bytes: u64,
pub archive_request: bool,
pub origin: Option<String>,
pub migrated: Option<DateTime>
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -0,0 +1 @@
mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306

@ -0,0 +1,17 @@
SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL;
UPDATE rpc_accounting SET migrated = NULL;
SELECT SUM(frontend_requests) FROM rpc_accounting;
SELECT SUM(frontend_requests) FROM rpc_accounting_v2;
SELECT SUM(backend_requests) FROM rpc_accounting;
SELECT SUM(backend_requests) FROM rpc_accounting_v2;
SELECT SUM(sum_request_bytes) FROM rpc_accounting;
SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2;
SELECT SUM(sum_response_millis) FROM rpc_accounting;
SELECT SUM(sum_response_millis) FROM rpc_accounting_v2;
SELECT SUM(sum_response_bytes) FROM rpc_accounting;
SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2;

@ -67,7 +67,7 @@ pub static APP_USER_AGENT: &str = concat!(
);
// aggregate across 1 week
const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
#[derive(Debug, From)]
struct ResponseCacheKey {
@ -575,7 +575,12 @@ impl Web3ProxyApp {
// stats can be saved in mysql, influxdb, both, or none
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
top_config.app.chain_id,
top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(),
top_config
.app
.influxdb_bucket
.clone()
.context("No influxdb bucket was provided")?
.to_owned(),
db_conn.clone(),
influxdb_client.clone(),
60,
@ -812,26 +817,27 @@ impl Web3ProxyApp {
app_handles.push(config_handle);
}
// =======
// if important_background_handles.is_empty() {
// info!("no important background handles");
//
// let f = tokio::spawn(async move {
// let _ = background_shutdown_receiver.recv().await;
//
// Ok(())
// });
//
// important_background_handles.push(f);
// >>>>>>> 77df3fa (stats v2)
// =======
// if important_background_handles.is_empty() {
// info!("no important background handles");
//
// let f = tokio::spawn(async move {
// let _ = background_shutdown_receiver.recv().await;
//
// Ok(())
// });
//
// important_background_handles.push(f);
// >>>>>>> 77df3fa (stats v2)
Ok((
app,
app_handles,
important_background_handles,
new_top_config_sender,
consensus_connections_watcher
).into())
consensus_connections_watcher,
)
.into())
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
@ -1793,7 +1799,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
method.to_string(),
Some(method.to_string()),
authorization.clone(),
request_metadata,
response.num_bytes(),
@ -1816,7 +1822,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
request_method,
Some(request_method),
authorization.clone(),
request_metadata,
response.num_bytes(),

@ -96,7 +96,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newHeads)".to_string(),
Some("eth_subscription(newHeads)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -167,7 +167,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingTransactions)".to_string(),
Some("eth_subscription(newPendingTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -243,7 +243,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingFullTransactions)".to_string(),
Some("eth_subscription(newPendingFullTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -319,7 +319,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingRawTransactions)".to_string(),
Some("eth_subscription(newPendingRawTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -350,7 +350,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
request_json.method.clone(),
Some(request_json.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),

@ -374,12 +374,17 @@ fn main() -> anyhow::Result<()> {
x.main(&db_conn).await
}
SubCommand::MigrateStatsToV2(x) => {
let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx");
// let top_config_path =
// top_config_path.expect("path must be set if top_config exists");
let db_url = cli_config
.db_url
.expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx");
let db_conn = get_db(db_url, 1, 1).await?;
x.main(&db_conn).await
x.main(top_config, &db_conn).await
}
SubCommand::Pagerduty(x) => {
if cli_config.sentry_url.is_none() {

@ -1,27 +1,135 @@
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_accounting, rpc_accounting_v2, user};
use chrono::{DateTime, Utc};
use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user};
use ethers::types::Address;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{debug, info, warn};
use log::{debug, error, info, trace, warn};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect
QueryFilter, QuerySelect, UpdateResult,
};
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
use migration::{Expr, Value};
use std::mem::swap;
use std::net::{IpAddr, Ipv4Addr};
use std::num::NonZeroU64;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
use tokio::time::{sleep, Instant};
use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS};
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{
Authorization, AuthorizationType, RequestMetadata, RpcSecretKey,
};
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer};
// Helper function to go from DateTime to Instant
fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
let epoch = datetime.timestamp(); // Get the Unix timestamp
let nanos = datetime.timestamp_subsec_nanos();
let duration_since_epoch = Duration::new(epoch as u64, nanos);
// let duration_since_datetime = Duration::new(, nanos);
let instant_new = Instant::now();
warn!("Instant new is: {:?}", instant_new);
let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
warn!("Instant since unix epoch is: {:?}", unix_epoch);
instant_new
.checked_sub(unix_epoch)
.context("Could not subtract unix epoch from instant now")?
.checked_add(duration_since_epoch)
.context("Could not add duration since epoch for updated time")
}
/// change a user's address.
#[derive(FromArgs, PartialEq, Eq, Debug)]
#[argh(subcommand, name = "migrate_stats_to_v2")]
pub struct MigrateStatsToV2 {}
// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think.
// Don't make data lossy
impl MigrateStatsToV2 {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
pub async fn main(
self,
top_config: TopConfig,
db_conn: &DatabaseConnection,
) -> anyhow::Result<()> {
// Also add influxdb container ...
// let mut spawned_app =
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
let number_of_rows_to_process_at_once = 500;
// we wouldn't really need this, but let's spawn this anyways
// easier than debugging the rest I suppose
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let mut important_background_handles = FuturesUnordered::new();
// Spawn the influxdb
let influxdb_client = match top_config.app.influxdb_host.as_ref() {
Some(influxdb_host) => {
let influxdb_org = top_config
.app
.influxdb_org
.clone()
.expect("influxdb_org needed when influxdb_host is set");
let influxdb_token = top_config
.app
.influxdb_token
.clone()
.expect("influxdb_token needed when influxdb_host is set");
let influxdb_client =
influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token);
// TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys
Some(influxdb_client)
}
None => None,
};
// Spawn the stat-sender
let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn(
top_config.app.chain_id,
top_config
.app
.influxdb_bucket
.clone()
.context("No influxdb bucket was provided")?
.to_owned(),
Some(db_conn.clone()),
influxdb_client.clone(),
30,
1,
BILLING_PERIOD_SECONDS,
rpc_account_shutdown_recevier,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(emitter_spawn.background_handle);
Some(emitter_spawn.stat_sender)
} else {
None
};
// Basically spawn the full app, look at web3_proxy CLI
while true {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?;
let old_records = rpc_accounting::Entity::find()
.filter(rpc_accounting::Column::Migrated.is_null())
.limit(number_of_rows_to_process_at_once)
.all(db_conn)
.await?;
if old_records.len() == 0 {
// Break out of while loop once all records have successfully been migrated ...
warn!("All records seem to have been successfully migrated!");
@ -29,93 +137,208 @@ impl MigrateStatsToV2 {
}
// (2) Create request metadata objects to match the old data
let mut global_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut opt_in_timeseries_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
let mut accounting_db_buffer = HashMap::<RpcQueryKey, BufferedRpcQueryStats>::new();
// Iterate through all old rows, and put them into the above objects.
for x in old_records {
for x in old_records.iter() {
// info!("Preparing for migration: {:?}", x);
info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
// according to frontend-requests, backend-requests, etc.
// // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object
// let key = RpcQueryKey {
// response_timestamp: x.period_datetime.timestamp(),
// archive_needed: x.archive_needed,
// error_response: x.error_response,
// period_datetime: x.period_datetime.timestamp(),
// rpc_secret_key_id: x.rpc_key_id,
// origin: x.origin,
// method: x.method
// };
//
// // Create the corresponding BufferedRpcQueryStats object
// let val = BufferedRpcQueryStats {
// frontend_requests: x.frontend_requests,
// backend_requests: x.backend_requests,
// backend_retries: x.backend_retries,
// no_servers: x.no_servers,
// cache_misses: x.cache_misses,
// cache_hits: x.cache_hits,
// sum_request_bytes: x.sum_request_bytes,
// sum_response_bytes: x.sum_response_bytes,
// sum_response_millis: x.sum_response_millis
// };
//
// // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of!
// // We can generate dummy bytes of the same length though, this may work as well
//
// // TODO: Period datetime is also a question of what it is
// // let response_stat = RpcQueryStats::new(
// // x.method,
// // authorization.clone(),
// // request_metadata.clone(),
// // response_bytes,
// // x.period_datetime
// // );
//
// // BufferedRpcQueryStats
// Get the rpc-key from the rpc_key_id
// Get the user-id from the rpc_key_id
let authorization_checks = match x.rpc_key_id {
Some(rpc_key_id) => {
let rpc_key_obj = rpc_key::Entity::find()
.filter(rpc_key::Column::Id.eq(rpc_key_id))
.one(db_conn)
.await?
.context("Could not find rpc_key_obj for the given rpc_key_id")?;
// TODO: Create authrization
// We can probably also randomly generate this, as we don't care about the user (?)
AuthorizationChecks {
user_id: rpc_key_obj.user_id,
rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)),
rpc_secret_key_id: Some(
NonZeroU64::new(rpc_key_id)
.context("Could not use rpc_key_id to create a u64")?,
),
..Default::default()
}
}
None => AuthorizationChecks {
..Default::default()
},
};
// Then overwrite rpc_key_id and user_id (?)
let authorization_type = AuthorizationType::Frontend;
let authorization = Arc::new(
Authorization::try_new(
authorization_checks,
None,
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
None,
None,
None,
authorization_type,
)
.context("Initializing Authorization Struct was not successful")?,
);
// It will be like a fork basically (to simulate getting multiple single requests ...)
// Iterate through all frontend requests
// For each frontend request, create one object that will be emitted (make sure the timestamp is new)
let n = x.frontend_requests;
for i in 0..n {
// info!("Creating a new frontend request");
// Collect all requests here ...
let mut int_request_bytes = (x.sum_request_bytes / n);
if i == 0 {
int_request_bytes += (x.sum_request_bytes % n);
}
let mut int_response_bytes = (x.sum_response_bytes / n);
if i == 0 {
int_response_bytes += (x.sum_response_bytes % n);
}
let mut int_response_millis = (x.sum_response_millis / n);
if i == 0 {
int_response_millis += (x.sum_response_millis % n);
}
let mut int_backend_requests = (x.backend_requests / n);
if i == 0 {
int_backend_requests += (x.backend_requests % n);
}
// Add module at the last step to include for any remained that we missed ... (?)
// TODO: Create RequestMetadata
let request_metadata = RequestMetadata {
start_instant: Instant::now(), // This is overwritten later on
request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes
archive_request: x.archive_request.into(),
backend_requests: Default::default(), // This is not used, instead we modify the field later
no_servers: 0.into(), // This is not relevant in the new version
error_response: x.error_response.into(),
response_bytes: int_response_bytes.into(),
response_millis: int_response_millis.into(),
// We just don't have this data
response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default()
};
// (3) Send through a channel to a stat emitter
// Send it to the stats sender
if let Some(stat_sender_ref) = stat_sender.as_ref() {
// info!("Method is: {:?}", x.clone().method);
let mut response_stat = RpcQueryStats::new(
x.clone().method,
authorization.clone(),
Arc::new(request_metadata),
(int_response_bytes)
.try_into()
.context("sum bytes average is not calculated properly")?,
);
// Modify the timestamps ..
response_stat.modify_struct(
int_response_millis,
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database
int_backend_requests,
);
// info!("Sending stats: {:?}", response_stat);
stat_sender_ref
// .send(response_stat.into())
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
// info!("Send! {:?}", stat_sender);
} else {
panic!("Stat sender was not spawned!");
}
// Create a new stats object
// Add it to the StatBuffer
}
// Let the stat_sender spawn / flush ...
// spawned_app.app.stat_sender.aggregate_and_save_loop()
// Send a signal to save ...
}
// (3) Await that all items are properly processed
// TODO: Await all the background handles
// Only after this mark all the items as processed / completed
// If the items are in rpc_v2, delete the initial items from the database
// return Ok(());
// (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated)
let old_record_ids = old_records.iter().map(|x| x.id);
let update_result: UpdateResult = rpc_accounting::Entity::update_many()
.col_expr(
rpc_accounting::Column::Migrated,
Expr::value(Value::ChronoDateTimeUtc(Some(Box::new(
chrono::offset::Utc::now(),
)))),
)
.filter(rpc_accounting::Column::Id.is_in(old_record_ids))
// .set(pear)
.exec(db_conn)
.await?;
info!("Update result is: {:?}", update_result);
// (N-1) Mark the batch as migrated
// break;
}
info!(
"Background handles (2) are: {:?}",
important_background_handles
);
// Drop the handle
// Send the shutdown signal here (?)
// important_background_handles.clear();
// Finally also send a shutdown signal
drop(stat_sender);
// match app_shutdown_sender.send(()) {
// Err(x) => {
// panic!("Could not send shutdown signal! {:?}", x);
// }
// _ => {}
// };
// (3) Update the batch in the old table with the current timestamp
// TODO: Should we also write a short verifier if the migration was successful (?)
// (4) Send through a channel to a stat emitter
// Drop the background handle, wait for any tasks that are on-going
while let Some(x) = important_background_handles.next().await {
info!("Returned item is: {:?}", x);
match x {
Err(e) => {
error!("{:?}", e);
}
Ok(Err(e)) => {
error!("{:?}", e);
}
Ok(Ok(_)) => {
// TODO: how can we know which handle exited?
info!("a background handle exited");
// Pop it in this case?
continue;
}
}
}
// let old_address: Address = self.old_address.parse()?;
// let new_address: Address = self.new_address.parse()?;
//
// let old_address: Vec<u8> = old_address.to_fixed_bytes().into();
// let new_address: Vec<u8> = new_address.to_fixed_bytes().into();
//
// let u = user::Entity::find()
// .filter(user::Column::Address.eq(old_address))
// .one(db_conn)
// .await?
// .context("No user found with that address")?;
//
// debug!("initial user: {:#?}", u);
//
// if u.address == new_address {
// info!("user already has this address");
// } else {
// let mut u = u.into_active_model();
//
// u.address = sea_orm::Set(new_address);
//
// let u = u.save(db_conn).await?;
//
// info!("changed user address");
//
// debug!("updated user: {:#?}", u);
// }
// info!("Here (?)");
Ok(())
}

@ -400,7 +400,7 @@ async fn handle_socket_payload(
if let Some(stat_sender) = app.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
json_request.method.clone(),
Some(json_request.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),

@ -34,6 +34,7 @@ use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom;
use tokio;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -167,7 +168,8 @@ impl Web3Rpcs {
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
watch::channel(Default::default());
// by_name starts empty. self.apply_server_configs will add to it
let by_name = Default::default();
@ -318,43 +320,43 @@ impl Web3Rpcs {
}
}
// <<<<<<< HEAD
// <<<<<<< HEAD
Ok(())
}
// =======
// // TODO: max_capacity and time_to_idle from config
// // all block hashes are the same size, so no need for weigher
// let block_hashes = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// // all block numbers are the same size, so no need for weigher
// let block_numbers = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
//
// let (watch_consensus_connections_sender, consensus_connections_watcher) =
// watch::channel(Default::default());
//
// let watch_consensus_head_receiver =
// watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
//
// let connections = Arc::new(Self {
// by_name: connections,
// watch_consensus_rpcs_sender: watch_consensus_connections_sender,
// watch_consensus_head_receiver,
// pending_transactions,
// block_hashes,
// block_numbers,
// min_sum_soft_limit,
// min_head_rpcs,
// max_block_age,
// max_block_lag,
// });
//
// let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
// >>>>>>> 77df3fa (stats v2)
// =======
// // TODO: max_capacity and time_to_idle from config
// // all block hashes are the same size, so no need for weigher
// let block_hashes = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// // all block numbers are the same size, so no need for weigher
// let block_numbers = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
//
// let (watch_consensus_connections_sender, consensus_connections_watcher) =
// watch::channel(Default::default());
//
// let watch_consensus_head_receiver =
// watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
//
// let connections = Arc::new(Self {
// by_name: connections,
// watch_consensus_rpcs_sender: watch_consensus_connections_sender,
// watch_consensus_head_receiver,
// pending_transactions,
// block_hashes,
// block_numbers,
// min_sum_soft_limit,
// min_head_rpcs,
// max_block_age,
// max_block_lag,
// });
//
// let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
// >>>>>>> 77df3fa (stats v2)
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).cloned()
@ -364,12 +366,12 @@ impl Web3Rpcs {
self.by_name.read().len()
}
// <<<<<<< HEAD
// <<<<<<< HEAD
pub fn is_empty(&self) -> bool {
self.by_name.read().is_empty()
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
}
pub fn min_head_rpcs(&self) -> usize {
@ -395,7 +397,7 @@ impl Web3Rpcs {
let clone = self.clone();
let authorization = authorization.clone();
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = task::spawn(async move {
let handle = tokio::task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
let f = clone.clone().process_incoming_tx_id(
@ -418,7 +420,7 @@ impl Web3Rpcs {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = task::Builder::default()
let handle = tokio::task::Builder::default()
.name("process_incoming_blocks")
.spawn(async move {
connections
@ -432,12 +434,14 @@ impl Web3Rpcs {
if futures.is_empty() {
// no transaction or block subscriptions.
let handle = task::Builder::default().name("noop").spawn(async move {
loop {
sleep(Duration::from_secs(600)).await;
// TODO: "every interval, check that the provider is still connected"
}
})?;
let handle = tokio::task::Builder::default()
.name("noop")
.spawn(async move {
loop {
sleep(Duration::from_secs(600)).await;
// TODO: "every interval, check that the provider is still connected"
}
})?;
futures.push(flatten_handle(handle));
}
@ -884,11 +888,11 @@ impl Web3Rpcs {
// TODO: maximum retries? right now its the total number of servers
loop {
// <<<<<<< HEAD
// <<<<<<< HEAD
if skip_rpcs.len() >= self.by_name.read().len() {
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
break;
}
@ -1159,18 +1163,18 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// <<<<<<< HEAD
// <<<<<<< HEAD
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
// =======
// =======
// TODO: i don't think this will ever happen
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
// TODO: subscribe to something in ConsensusWeb3Rpcs instead
sleep(Duration::from_millis(200)).await;
// >>>>>>> 77df3fa (stats v2)
// >>>>>>> 77df3fa (stats v2)
continue;
}
@ -1299,13 +1303,13 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;
#[tokio::test]

@ -35,11 +35,12 @@ pub enum StatType {
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
pub authorization: Arc<Authorization>,
pub method: String,
pub method: Option<String>,
pub archive_request: bool,
pub error_response: bool,
pub request_bytes: u64,
/// if backend_requests is 0, there was a cache_hit
// pub frontend_request: u64,
pub backend_requests: u64,
pub response_bytes: u64,
pub response_millis: u64,
@ -96,7 +97,7 @@ impl RpcQueryStats {
TrackingLevel::Detailed => {
// detailed tracking keeps track of the method and origin
// depending on the request, the origin might still be None
let method = Some(self.method.clone());
let method = self.method.clone();
let origin = self.authorization.origin.clone();
(method, origin)
@ -116,7 +117,7 @@ impl RpcQueryStats {
/// all queries are aggregated
/// TODO: should we store "anon" or "registered" as a key just to be able to split graphs?
fn global_timeseries_key(&self) -> RpcQueryKey {
let method = Some(self.method.clone());
let method = self.method.clone();
// we don't store origin in the timeseries db. its only used for optional accounting
let origin = None;
// everyone gets grouped together
@ -140,7 +141,7 @@ impl RpcQueryStats {
TrackingLevel::None => {
// this RPC key requested no tracking. this is the default.
// we still want graphs though, so we just use None as the rpc_secret_key_id
(Some(self.method.clone()), None)
(self.method.clone(), None)
}
TrackingLevel::Aggregated => {
// this RPC key requested tracking aggregated across all methods
@ -149,7 +150,7 @@ impl RpcQueryStats {
TrackingLevel::Detailed => {
// detailed tracking keeps track of the method
(
Some(self.method.clone()),
self.method.clone(),
self.authorization.checks.rpc_secret_key_id,
)
}
@ -361,10 +362,10 @@ impl BufferedRpcQueryStats {
impl RpcQueryStats {
pub fn new(
method: String,
method: Option<String>,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response_bytes: usize
response_bytes: usize,
) -> Self {
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
@ -391,6 +392,17 @@ impl RpcQueryStats {
}
}
/// Only used for migration from stats_v1 to stats_v2/v3
pub fn modify_struct(
&mut self,
response_millis: u64,
response_timestamp: i64,
backend_requests: u64,
) {
self.response_millis = response_millis;
self.response_timestamp = response_timestamp;
self.backend_requests = backend_requests;
}
}
impl StatBuffer {
@ -450,6 +462,7 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
// info!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
@ -476,6 +489,7 @@ impl StatBuffer {
}
}
_ = db_save_interval.tick() => {
// info!("DB save internal tick");
let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats");
// TODO: batch saves
@ -487,6 +501,7 @@ impl StatBuffer {
}
}
_ = tsdb_save_interval.tick() => {
// info!("TSDB save internal tick");
// TODO: batch saves
// TODO: better bucket names
let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");
@ -506,6 +521,7 @@ impl StatBuffer {
}
}
x = shutdown_receiver.recv() => {
info!("shutdown signal ---");
match x {
Ok(_) => {
info!("stat_loop shutting down");
@ -544,13 +560,7 @@ impl StatBuffer {
for (key, stat) in global_timeseries_buffer.drain() {
if let Err(err) = stat
.save_timeseries(
&bucket,
"global_proxy",
self.chain_id,
influxdb_client,
key,
)
.save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key)
.await
{
error!(
@ -567,13 +577,7 @@ impl StatBuffer {
for (key, stat) in opt_in_timeseries_buffer.drain() {
if let Err(err) = stat
.save_timeseries(
&bucket,
"opt_in_proxy",
self.chain_id,
influxdb_client,
key,
)
.save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key)
.await
{
error!(