simple changes around bryans comments

This commit is contained in:
yenicelik 2023-03-31 12:43:41 +01:00
parent 01065adeac
commit 0bb3a2dc06
9 changed files with 33 additions and 102 deletions

View File

@ -1,7 +1,6 @@
[build] [build]
rustflags = [ rustflags = [
# potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html # potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html
# TODO: we might want to disable this so its easier to run the proxy across different aws instance types
"-C", "target-cpu=native", "-C", "target-cpu=native",
# tokio unstable is needed for tokio-console # tokio unstable is needed for tokio-console
"--cfg", "tokio_unstable" "--cfg", "tokio_unstable"

View File

@ -1,4 +1,5 @@
# Got eth spam from here # Got eth spam from here
# https://github.com/shazow/ethspam
# Got versus from here # Got versus from here
# https://github.com/INFURA/versus # https://github.com/INFURA/versus

View File

@ -407,7 +407,7 @@ impl Web3ProxyApp {
num_workers: usize, num_workers: usize,
shutdown_sender: broadcast::Sender<()>, shutdown_sender: broadcast::Sender<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> { ) -> anyhow::Result<Web3ProxyAppSpawn> {
let rpc_account_shutdown_recevier = shutdown_sender.subscribe(); let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe();
// safety checks on the config // safety checks on the config
@ -587,7 +587,7 @@ impl Web3ProxyApp {
60, 60,
1, 1,
BILLING_PERIOD_SECONDS, BILLING_PERIOD_SECONDS,
rpc_account_shutdown_recevier, stat_buffer_shutdown_receiver,
)? { )? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting // since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(emitter_spawn.background_handle); important_background_handles.push(emitter_spawn.background_handle);
@ -711,9 +711,6 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120)) .time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// prepare a Web3Rpcs to hold all our balanced connections
// let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn(
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
top_config.app.chain_id, top_config.app.chain_id,
db_conn.clone(), db_conn.clone(),

View File

@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use migration::sea_orm::QueryOrder;
use migration::sea_orm::{ use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect, UpdateResult, QueryFilter, QuerySelect, UpdateResult,
@ -45,25 +46,18 @@ fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
.context("Could not add duration since epoch for updated time") .context("Could not add duration since epoch for updated time")
} }
/// change a user's address.
#[derive(FromArgs, PartialEq, Eq, Debug)] #[derive(FromArgs, PartialEq, Eq, Debug)]
/// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting
#[argh(subcommand, name = "migrate_stats_to_v2")] #[argh(subcommand, name = "migrate_stats_to_v2")]
pub struct MigrateStatsToV2 {} pub struct MigrateStatsToV2 {}
// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think.
// Don't make data lossy
impl MigrateStatsToV2 { impl MigrateStatsToV2 {
pub async fn main( pub async fn main(
self, self,
top_config: TopConfig, top_config: TopConfig,
db_conn: &DatabaseConnection, db_conn: &DatabaseConnection,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Also add influxdb container ... let number_of_rows_to_process_at_once = 2000;
// let mut spawned_app =
// Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?;
let number_of_rows_to_process_at_once = 500;
// we wouldn't really need this, but let's spawn this anyways // we wouldn't really need this, but let's spawn this anyways
// easier than debugging the rest I suppose // easier than debugging the rest I suppose
@ -121,31 +115,26 @@ impl MigrateStatsToV2 {
None None
}; };
// Basically spawn the full app, look at web3_proxy CLI let migration_timestamp = chrono::offset::Utc::now();
while true { // Iterate over rows that were not market as "migrated" yet and process them
loop {
// (1) Load a batch of rows out of the old table until no more rows are left // (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find() let old_records = rpc_accounting::Entity::find()
.filter(rpc_accounting::Column::Migrated.is_null()) .filter(rpc_accounting::Column::Migrated.is_null())
.limit(number_of_rows_to_process_at_once) .limit(number_of_rows_to_process_at_once)
.order_by_asc(rpc_accounting::Column::Id)
.all(db_conn) .all(db_conn)
.await?; .await?;
if old_records.len() == 0 { if old_records.len() == 0 {
// Break out of while loop once all records have successfully been migrated ... // Break out of while loop once all records have successfully been migrated ...
warn!("All records seem to have been successfully migrated!"); info!("All records seem to have been successfully migrated!");
break; break;
} }
// (2) Create request metadata objects to match the old data // (2) Create request metadata objects to match the old data
// Iterate through all old rows, and put them into the above objects. // Iterate through all old rows, and put them into the above objects.
for x in old_records.iter() { for x in old_records.iter() {
// info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
// according to frontend-requests, backend-requests, etc.
// Get the rpc-key from the rpc_key_id
// Get the user-id from the rpc_key_id
let authorization_checks = match x.rpc_key_id { let authorization_checks = match x.rpc_key_id {
Some(rpc_key_id) => { Some(rpc_key_id) => {
let rpc_key_obj = rpc_key::Entity::find() let rpc_key_obj = rpc_key::Entity::find()
@ -166,13 +155,10 @@ impl MigrateStatsToV2 {
..Default::default() ..Default::default()
} }
} }
None => AuthorizationChecks { None => Default::default(),
..Default::default()
},
}; };
// Then overwrite rpc_key_id and user_id (?) let authorization_type = AuthorizationType::Internal;
let authorization_type = AuthorizationType::Frontend;
let authorization = Arc::new( let authorization = Arc::new(
Authorization::try_new( Authorization::try_new(
authorization_checks, authorization_checks,
@ -246,7 +232,7 @@ impl MigrateStatsToV2 {
// Modify the timestamps .. // Modify the timestamps ..
response_stat.modify_struct( response_stat.modify_struct(
int_response_millis, int_response_millis,
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database x.period_datetime.timestamp(),
int_backend_requests, int_backend_requests,
); );
// info!("Sending stats: {:?}", response_stat); // info!("Sending stats: {:?}", response_stat);
@ -255,18 +241,10 @@ impl MigrateStatsToV2 {
.send_async(response_stat.into()) .send_async(response_stat.into())
.await .await
.context("stat_sender sending response_stat")?; .context("stat_sender sending response_stat")?;
// info!("Send! {:?}", stat_sender);
} else { } else {
panic!("Stat sender was not spawned!"); panic!("Stat sender was not spawned!");
} }
// Create a new stats object
// Add it to the StatBuffer
} }
// Let the stat_sender spawn / flush ...
// spawned_app.app.stat_sender.aggregate_and_save_loop()
// Send a signal to save ...
} }
// (3) Await that all items are properly processed // (3) Await that all items are properly processed
@ -284,7 +262,7 @@ impl MigrateStatsToV2 {
.col_expr( .col_expr(
rpc_accounting::Column::Migrated, rpc_accounting::Column::Migrated,
Expr::value(Value::ChronoDateTimeUtc(Some(Box::new( Expr::value(Value::ChronoDateTimeUtc(Some(Box::new(
chrono::offset::Utc::now(), migration_timestamp,
)))), )))),
) )
.filter(rpc_accounting::Column::Id.is_in(old_record_ids)) .filter(rpc_accounting::Column::Id.is_in(old_record_ids))
@ -293,9 +271,6 @@ impl MigrateStatsToV2 {
.await?; .await?;
info!("Update result is: {:?}", update_result); info!("Update result is: {:?}", update_result);
// (N-1) Mark the batch as migrated
// break;
} }
info!( info!(
@ -303,12 +278,6 @@ impl MigrateStatsToV2 {
important_background_handles important_background_handles
); );
// Drop the handle
// Send the shutdown signal here (?)
// important_background_handles.clear();
// Finally also send a shutdown signal
drop(stat_sender); drop(stat_sender);
// match app_shutdown_sender.send(()) { // match app_shutdown_sender.send(()) {
// Err(x) => { // Err(x) => {
@ -317,8 +286,6 @@ impl MigrateStatsToV2 {
// _ => {} // _ => {}
// }; // };
// TODO: Should we also write a short verifier if the migration was successful (?)
// Drop the background handle, wait for any tasks that are on-going // Drop the background handle, wait for any tasks that are on-going
while let Some(x) = important_background_handles.next().await { while let Some(x) = important_background_handles.next().await {
info!("Returned item is: {:?}", x); info!("Returned item is: {:?}", x);
@ -337,9 +304,6 @@ impl MigrateStatsToV2 {
} }
} }
} }
// info!("Here (?)");
Ok(()) Ok(())
} }
} }

View File

@ -613,7 +613,7 @@ impl Web3ProxyApp {
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
) -> anyhow::Result<RateLimitResult> { ) -> anyhow::Result<RateLimitResult> {
// ip rate limits don't check referer or user agent // ip rate limits don't check referer or user agent
// the do check origin because we can override rate limits for some origins // they do check origin because we can override rate limits for some origins
let authorization = Authorization::external( let authorization = Authorization::external(
allowed_origin_requests_per_period, allowed_origin_requests_per_period,
self.db_conn.clone(), self.db_conn.clone(),

View File

@ -102,8 +102,6 @@ impl Web3ProxyBlock {
if block_timestamp < now { if block_timestamp < now {
// this server is still syncing from too far away to serve requests // this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
// (now - block_timestamp).as_secs()
// u64 is safe because we checked equality above // u64 is safe because we checked equality above
(now - block_timestamp) as u64 (now - block_timestamp) as u64
} else { } else {
@ -346,9 +344,6 @@ impl Web3Rpcs {
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self let response = self
.try_send_best_consensus_head_connection(authorization, request, None, Some(num), None) .try_send_best_consensus_head_connection(authorization, request, None, Some(num), None)
.await?; .await?;

View File

@ -282,10 +282,6 @@ impl Web3Rpcs {
}) })
.collect(); .collect();
// map of connection names to their connection
// let mut connections = HashMap::new();
// let mut handles = vec![];
while let Some(x) = spawn_handles.next().await { while let Some(x) = spawn_handles.next().await {
match x { match x {
Ok(Ok((rpc, _handle))) => { Ok(Ok((rpc, _handle))) => {
@ -366,12 +362,8 @@ impl Web3Rpcs {
self.by_name.read().len() self.by_name.read().len()
} }
// <<<<<<< HEAD
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.by_name.read().is_empty() self.by_name.read().is_empty()
// =======
// Ok((connections, handle, consensus_connections_watcher))
// >>>>>>> 77df3fa (stats v2)
} }
pub fn min_head_rpcs(&self) -> usize { pub fn min_head_rpcs(&self) -> usize {
@ -888,11 +880,7 @@ impl Web3Rpcs {
// TODO: maximum retries? right now its the total number of servers // TODO: maximum retries? right now its the total number of servers
loop { loop {
// <<<<<<< HEAD
if skip_rpcs.len() >= self.by_name.read().len() { if skip_rpcs.len() >= self.by_name.read().len() {
// =======
// if skip_rpcs.len() == self.by_name.len() {
// >>>>>>> 77df3fa (stats v2)
break; break;
} }
@ -1173,18 +1161,8 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::Release); request_metadata.no_servers.fetch_add(1, Ordering::Release);
} }
// <<<<<<< HEAD
watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update(); watch_consensus_rpcs.borrow_and_update();
// =======
// TODO: i don't think this will ever happen
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
// TODO: subscribe to something in ConsensusWeb3Rpcs instead
sleep(Duration::from_millis(200)).await;
// >>>>>>> 77df3fa (stats v2)
continue; continue;
} }
@ -1285,11 +1263,12 @@ impl Serialize for Web3Rpcs {
/// TODO: i think we still have sorts scattered around the code that should use this /// TODO: i think we still have sorts scattered around the code that should use this
/// TODO: take AsRef or something like that? We don't need an Arc here /// TODO: take AsRef or something like that? We don't need an Arc here
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) { fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
let head_block = x.head_block let head_block = x
.read() .head_block
.as_ref() .read()
.map(|x| *x.number()) .as_ref()
.unwrap_or_default(); .map(|x| *x.number())
.unwrap_or_default();
let tier = x.tier; let tier = x.tier;

View File

@ -9,9 +9,9 @@ use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256}; use ethers::types::{Address, Transaction, U256};
use futures::StreamExt;
use futures::future::try_join_all; use futures::future::try_join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, error, info, trace, warn, Level}; use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
@ -243,12 +243,9 @@ impl Web3Rpc {
block_data_limit, block_data_limit,
reconnect, reconnect,
tier: config.tier, tier: config.tier,
// <<<<<<< HEAD
disconnect_watch: Some(disconnect_sender), disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at), created_at: Some(created_at),
// =======
head_block: RwLock::new(Default::default()), head_block: RwLock::new(Default::default()),
// >>>>>>> 77df3fa (stats v2)
..Default::default() ..Default::default()
}; };
@ -723,7 +720,7 @@ impl Web3Rpc {
} else { } else {
RequestErrorHandler::ErrorLevel RequestErrorHandler::ErrorLevel
}; };
let mut delay_start = false; let mut delay_start = false;
// this does loop. just only when reconnect is enabled // this does loop. just only when reconnect is enabled
@ -788,7 +785,6 @@ impl Web3Rpc {
let head_block = rpc.head_block.read().clone(); let head_block = rpc.head_block.read().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| { if let Some((block_number, txid)) = head_block.and_then(|x| {
// let block = x.block;
let block = x.block.clone(); let block = x.block.clone();
let block_number = block.number?; let block_number = block.number?;
@ -912,7 +908,7 @@ impl Web3Rpc {
continue; continue;
} }
// reconnect is not enabled. // reconnect is not enabled.
if *disconnect_receiver.borrow() { if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self); info!("{} is disconnecting", self);
@ -1174,7 +1170,9 @@ impl Web3Rpc {
if self.should_disconnect() { if self.should_disconnect() {
Ok(()) Ok(())
} else { } else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed")) Err(anyhow!(
"pending_transactions subscription exited. reconnect needed"
))
} }
} }
@ -1251,14 +1249,8 @@ impl Web3Rpc {
} }
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
// <<<<<<< HEAD
let hard_limit_ready = *hard_limit_until.borrow(); let hard_limit_ready = *hard_limit_until.borrow();
// =======
// let hard_limit_ready = hard_limit_until.borrow().to_owned();
// >>>>>>> 77df3fa (stats v2)
let now = Instant::now(); let now = Instant::now();
if now < hard_limit_ready { if now < hard_limit_ready {
return Ok(OpenRequestResult::RetryAt(hard_limit_ready)); return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
} }

View File

@ -198,7 +198,11 @@ impl OpenRequestHandle {
// TODO: replace ethers-rs providers with our own that supports streaming the responses // TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() { let response = match provider.as_ref() {
#[cfg(test)] #[cfg(test)]
Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())), Web3Provider::Mock => {
return Err(ProviderError::CustomError(
"mock provider can't respond".to_string(),
))
}
Web3Provider::Ws(p) => p.request(method, params).await, Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => { Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks