Merge branch 'stats_v3' of github.com:yenicelik/web3-proxy into Web3ProxyError

This commit is contained in:
Rory Neithinger 2023-04-01 00:04:44 -07:00
commit f0d8a0c8c9
9 changed files with 34 additions and 103 deletions

@ -1,7 +1,6 @@
[build]
rustflags = [
# potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html
# TODO: we might want to disable this so its easier to run the proxy across different aws instance types
"-C", "target-cpu=native",
# tokio unstable is needed for tokio-console
"--cfg", "tokio_unstable"

@ -1,4 +1,5 @@
# Got eth spam from here
# https://github.com/shazow/ethspam
# Got versus from here
# https://github.com/INFURA/versus

@ -408,8 +408,8 @@ impl Web3ProxyApp {
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let rpc_account_shutdown_recevier = shutdown_sender.subscribe();
let _background_shutdown_receiver = shutdown_sender.subscribe();
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
@ -588,7 +588,7 @@ impl Web3ProxyApp {
60,
1,
BILLING_PERIOD_SECONDS,
rpc_account_shutdown_recevier,
stat_buffer_shutdown_receiver,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(emitter_spawn.background_handle);
@ -712,9 +712,6 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// prepare a Web3Rpcs to hold all our balanced connections
// let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn(
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
top_config.app.chain_id,
db_conn.clone(),

@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{debug, error, info, trace, warn};
use migration::sea_orm::QueryOrder;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect, UpdateResult,
@ -45,25 +46,18 @@ fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
.context("Could not add duration since epoch for updated time")
}
/// change a user's address.
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting
#[argh(subcommand, name = "migrate_stats_to_v2")]
pub struct MigrateStatsToV2 {}
// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think.
// Don't make data lossy
impl MigrateStatsToV2 {
pub async fn main(
self,
top_config: TopConfig,
db_conn: &DatabaseConnection,
) -> anyhow::Result<()> {
// Also add influxdb container ...
// let mut spawned_app =
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
let number_of_rows_to_process_at_once = 500;
let number_of_rows_to_process_at_once = 2000;
// we wouldn't really need this, but let's spawn this anyways
// easier than debugging the rest I suppose
@ -121,31 +115,26 @@ impl MigrateStatsToV2 {
None
};
// Basically spawn the full app, look at web3_proxy CLI
let migration_timestamp = chrono::offset::Utc::now();
while true {
// Iterate over rows that were not market as "migrated" yet and process them
loop {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find()
.filter(rpc_accounting::Column::Migrated.is_null())
.limit(number_of_rows_to_process_at_once)
.order_by_asc(rpc_accounting::Column::Id)
.all(db_conn)
.await?;
if old_records.len() == 0 {
// Break out of while loop once all records have successfully been migrated ...
warn!("All records seem to have been successfully migrated!");
info!("All records seem to have been successfully migrated!");
break;
}
// (2) Create request metadata objects to match the old data
// Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() {
// info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
// according to frontend-requests, backend-requests, etc.
// Get the rpc-key from the rpc_key_id
// Get the user-id from the rpc_key_id
let authorization_checks = match x.rpc_key_id {
Some(rpc_key_id) => {
let rpc_key_obj = rpc_key::Entity::find()
@ -166,13 +155,10 @@ impl MigrateStatsToV2 {
..Default::default()
}
}
None => AuthorizationChecks {
..Default::default()
},
None => Default::default(),
};
// Then overwrite rpc_key_id and user_id (?)
let authorization_type = AuthorizationType::Frontend;
let authorization_type = AuthorizationType::Internal;
let authorization = Arc::new(
Authorization::try_new(
authorization_checks,
@ -246,7 +232,7 @@ impl MigrateStatsToV2 {
// Modify the timestamps ..
response_stat.modify_struct(
int_response_millis,
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database
x.period_datetime.timestamp(),
int_backend_requests,
);
// info!("Sending stats: {:?}", response_stat);
@ -255,18 +241,10 @@ impl MigrateStatsToV2 {
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
// info!("Send! {:?}", stat_sender);
} else {
panic!("Stat sender was not spawned!");
}
// Create a new stats object
// Add it to the StatBuffer
}
// Let the stat_sender spawn / flush ...
// spawned_app.app.stat_sender.aggregate_and_save_loop()
// Send a signal to save ...
}
// (3) Await that all items are properly processed
@ -284,7 +262,7 @@ impl MigrateStatsToV2 {
.col_expr(
rpc_accounting::Column::Migrated,
Expr::value(Value::ChronoDateTimeUtc(Some(Box::new(
chrono::offset::Utc::now(),
migration_timestamp,
)))),
)
.filter(rpc_accounting::Column::Id.is_in(old_record_ids))
@ -293,9 +271,6 @@ impl MigrateStatsToV2 {
.await?;
info!("Update result is: {:?}", update_result);
// (N-1) Mark the batch as migrated
// break;
}
info!(
@ -303,12 +278,6 @@ impl MigrateStatsToV2 {
important_background_handles
);
// Drop the handle
// Send the shutdown signal here (?)
// important_background_handles.clear();
// Finally also send a shutdown signal
drop(stat_sender);
// match app_shutdown_sender.send(()) {
// Err(x) => {
@ -317,8 +286,6 @@ impl MigrateStatsToV2 {
// _ => {}
// };
// TODO: Should we also write a short verifier if the migration was successful (?)
// Drop the background handle, wait for any tasks that are on-going
while let Some(x) = important_background_handles.next().await {
info!("Returned item is: {:?}", x);
@ -337,9 +304,6 @@ impl MigrateStatsToV2 {
}
}
}
// info!("Here (?)");
Ok(())
}
}

@ -605,7 +605,7 @@ impl Web3ProxyApp {
proxy_mode: ProxyMode,
) -> Web3ProxyResult<RateLimitResult> {
// ip rate limits don't check referer or user agent
// the do check origin because we can override rate limits for some origins
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(
allowed_origin_requests_per_period,
self.db_conn.clone(),

@ -102,8 +102,6 @@ impl Web3ProxyBlock {
if block_timestamp < now {
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
// (now - block_timestamp).as_secs()
// u64 is safe because we checked equality above
(now - block_timestamp) as u64
} else {
@ -346,9 +344,6 @@ impl Web3Rpcs {
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), None)
.await?;

@ -283,10 +283,6 @@ impl Web3Rpcs {
})
.collect();
// map of connection names to their connection
// let mut connections = HashMap::new();
// let mut handles = vec![];
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((rpc, _handle))) => {
@ -367,12 +363,8 @@ impl Web3Rpcs {
self.by_name.read().len()
}
// <<<<<<< HEAD
pub fn is_empty(&self) -> bool {
self.by_name.read().is_empty()
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
}
pub fn min_head_rpcs(&self) -> usize {
@ -888,11 +880,7 @@ impl Web3Rpcs {
// TODO: maximum retries? right now its the total number of servers
loop {
// <<<<<<< HEAD
if skip_rpcs.len() >= self.by_name.read().len() {
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
break;
}
@ -1173,18 +1161,8 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// <<<<<<< HEAD
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
// =======
// TODO: i don't think this will ever happen
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
// TODO: subscribe to something in ConsensusWeb3Rpcs instead
sleep(Duration::from_millis(200)).await;
// >>>>>>> 77df3fa (stats v2)
continue;
}
@ -1285,11 +1263,12 @@ impl Serialize for Web3Rpcs {
/// TODO: i think we still have sorts scattered around the code that should use this
/// TODO: take AsRef or something like that? We don't need an Arc here
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
let head_block = x.head_block
.read()
.as_ref()
.map(|x| *x.number())
.unwrap_or_default();
let head_block = x
.head_block
.read()
.as_ref()
.map(|x| *x.number())
.unwrap_or_default();
let tier = x.tier;

@ -10,9 +10,9 @@ use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256};
use futures::StreamExt;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
@ -244,12 +244,9 @@ impl Web3Rpc {
block_data_limit,
reconnect,
tier: config.tier,
// <<<<<<< HEAD
disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
// =======
head_block: RwLock::new(Default::default()),
// >>>>>>> 77df3fa (stats v2)
..Default::default()
};
@ -724,7 +721,7 @@ impl Web3Rpc {
} else {
RequestErrorHandler::ErrorLevel
};
let mut delay_start = false;
// this does loop. just only when reconnect is enabled
@ -789,7 +786,6 @@ impl Web3Rpc {
let head_block = rpc.head_block.read().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| {
// let block = x.block;
let block = x.block.clone();
let block_number = block.number?;
@ -913,7 +909,7 @@ impl Web3Rpc {
continue;
}
// reconnect is not enabled.
if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
@ -1175,7 +1171,9 @@ impl Web3Rpc {
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed"))
Err(anyhow!(
"pending_transactions subscription exited. reconnect needed"
))
}
}
@ -1251,14 +1249,8 @@ impl Web3Rpc {
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
// <<<<<<< HEAD
let hard_limit_ready = *hard_limit_until.borrow();
// =======
// let hard_limit_ready = hard_limit_until.borrow().to_owned();
// >>>>>>> 77df3fa (stats v2)
let now = Instant::now();
if now < hard_limit_ready {
return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
}

@ -198,7 +198,11 @@ impl OpenRequestHandle {
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
#[cfg(test)]
Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())),
Web3Provider::Mock => {
return Err(ProviderError::CustomError(
"mock provider can't respond".to_string(),
))
}
Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks