migration seems to work for the most part

This commit is contained in:
yenicelik 2023-03-30 12:54:01 +01:00
parent 7390bb7910
commit 05463c7ee5
7 changed files with 45 additions and 25 deletions

View File

@ -0,0 +1 @@
mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306

View File

@ -0,0 +1,17 @@
SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL;
UPDATE rpc_accounting SET migrated = NULL;
SELECT SUM(frontend_requests) FROM rpc_accounting;
SELECT SUM(frontend_requests) FROM rpc_accounting_v2;
SELECT SUM(backend_requests) FROM rpc_accounting;
SELECT SUM(backend_requests) FROM rpc_accounting_v2;
SELECT SUM(sum_request_bytes) FROM rpc_accounting;
SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2;
SELECT SUM(sum_response_millis) FROM rpc_accounting;
SELECT SUM(sum_response_millis) FROM rpc_accounting_v2;
SELECT SUM(sum_response_bytes) FROM rpc_accounting;
SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2;

View File

@ -1799,7 +1799,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
method.to_string(),
Some(method.to_string()),
authorization.clone(),
request_metadata,
response.num_bytes(),
@ -1822,7 +1822,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
request_method,
Some(request_method),
authorization.clone(),
request_metadata,
response.num_bytes(),

View File

@ -96,7 +96,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newHeads)".to_string(),
Some("eth_subscription(newHeads)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -167,7 +167,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingTransactions)".to_string(),
Some("eth_subscription(newPendingTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -243,7 +243,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingFullTransactions)".to_string(),
Some("eth_subscription(newPendingFullTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -319,7 +319,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
"eth_subscription(newPendingRawTransactions)".to_string(),
Some("eth_subscription(newPendingRawTransactions)".to_string()),
authorization.clone(),
request_metadata.clone(),
response_bytes,
@ -350,7 +350,7 @@ impl Web3ProxyApp {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
request_json.method.clone(),
Some(request_json.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),

View File

@ -63,6 +63,8 @@ impl MigrateStatsToV2 {
// let mut spawned_app =
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
let number_of_rows_to_process_at_once = 500;
// we wouldn't really need this, but let's spawn this anyways
// easier than debugging the rest I suppose
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
@ -125,7 +127,7 @@ impl MigrateStatsToV2 {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find()
.filter(rpc_accounting::Column::Migrated.is_null())
.limit(2)
.limit(number_of_rows_to_process_at_once)
.all(db_conn)
.await?;
if old_records.len() == 0 {
@ -137,7 +139,7 @@ impl MigrateStatsToV2 {
// (2) Create request metadata objects to match the old data
// Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() {
info!("Preparing for migration: {:?}", x);
// info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
// according to frontend-requests, backend-requests, etc.
@ -232,9 +234,9 @@ impl MigrateStatsToV2 {
// (3) Send through a channel to a stat emitter
// Send it to the stats sender
if let Some(stat_sender_ref) = stat_sender.as_ref() {
info!("Method is: {:?}", x.clone().method.unwrap());
// info!("Method is: {:?}", x.clone().method);
let mut response_stat = RpcQueryStats::new(
x.clone().method.unwrap(),
x.clone().method,
authorization.clone(),
Arc::new(request_metadata),
(int_response_bytes)
@ -247,13 +249,13 @@ impl MigrateStatsToV2 {
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database
int_backend_requests,
);
info!("Sending stats: {:?}", response_stat);
// info!("Sending stats: {:?}", response_stat);
stat_sender_ref
// .send(response_stat.into())
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
info!("Send! {:?}", stat_sender);
// info!("Send! {:?}", stat_sender);
} else {
panic!("Stat sender was not spawned!");
}
@ -269,7 +271,6 @@ impl MigrateStatsToV2 {
// (3) Await that all items are properly processed
// TODO: Await all the background handles
info!("Waiting for a second until all is flushed");
// Only after this mark all the items as processed / completed
@ -294,6 +295,7 @@ impl MigrateStatsToV2 {
info!("Update result is: {:?}", update_result);
// (N-1) Mark the batch as migrated
// break;
}
info!(
@ -336,7 +338,7 @@ impl MigrateStatsToV2 {
}
}
info!("Here (?)");
// info!("Here (?)");
Ok(())
}

View File

@ -400,7 +400,7 @@ async fn handle_socket_payload(
if let Some(stat_sender) = app.stat_sender.as_ref() {
let response_stat = RpcQueryStats::new(
json_request.method.clone(),
Some(json_request.method.clone()),
authorization.clone(),
request_metadata,
response.num_bytes(),

View File

@ -35,7 +35,7 @@ pub enum StatType {
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
pub authorization: Arc<Authorization>,
pub method: String,
pub method: Option<String>,
pub archive_request: bool,
pub error_response: bool,
pub request_bytes: u64,
@ -97,7 +97,7 @@ impl RpcQueryStats {
TrackingLevel::Detailed => {
// detailed tracking keeps track of the method and origin
// depending on the request, the origin might still be None
let method = Some(self.method.clone());
let method = self.method.clone();
let origin = self.authorization.origin.clone();
(method, origin)
@ -117,7 +117,7 @@ impl RpcQueryStats {
/// all queries are aggregated
/// TODO: should we store "anon" or "registered" as a key just to be able to split graphs?
fn global_timeseries_key(&self) -> RpcQueryKey {
let method = Some(self.method.clone());
let method = self.method.clone();
// we don't store origin in the timeseries db. its only used for optional accounting
let origin = None;
// everyone gets grouped together
@ -141,7 +141,7 @@ impl RpcQueryStats {
TrackingLevel::None => {
// this RPC key requested no tracking. this is the default.
// we still want graphs though, so we just use None as the rpc_secret_key_id
(Some(self.method.clone()), None)
(self.method.clone(), None)
}
TrackingLevel::Aggregated => {
// this RPC key requested tracking aggregated across all methods
@ -150,7 +150,7 @@ impl RpcQueryStats {
TrackingLevel::Detailed => {
// detailed tracking keeps track of the method
(
Some(self.method.clone()),
self.method.clone(),
self.authorization.checks.rpc_secret_key_id,
)
}
@ -362,7 +362,7 @@ impl BufferedRpcQueryStats {
impl RpcQueryStats {
pub fn new(
method: String,
method: Option<String>,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response_bytes: usize,
@ -462,7 +462,7 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
info!("Received stat");
// info!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
@ -489,7 +489,7 @@ impl StatBuffer {
}
}
_ = db_save_interval.tick() => {
info!("DB save internal tick");
// info!("DB save internal tick");
let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats");
// TODO: batch saves
@ -501,7 +501,7 @@ impl StatBuffer {
}
}
_ = tsdb_save_interval.tick() => {
info!("TSDB save internal tick");
// info!("TSDB save internal tick");
// TODO: batch saves
// TODO: better bucket names
let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");