latest changes from bryan

This commit is contained in:
yenicelik 2023-04-04 14:40:22 +02:00
parent 0bb3a2dc06
commit 0069e76040
7 changed files with 120 additions and 219 deletions

@ -820,18 +820,18 @@ impl Web3ProxyApp {
app_handles.push(config_handle);
}
// =======
// if important_background_handles.is_empty() {
// info!("no important background handles");
//
// let f = tokio::spawn(async move {
// let _ = background_shutdown_receiver.recv().await;
//
// Ok(())
// });
//
// important_background_handles.push(f);
// >>>>>>> 77df3fa (stats v2)
if important_background_handles.is_empty() {
info!("no important background handles");
let f = tokio::spawn(async move {
let _ = background_shutdown_receiver.recv().await;
Ok(())
});
important_background_handles.push(f);
}
Ok((
app,

@ -279,14 +279,15 @@ impl MigrateStatsToV2 {
);
drop(stat_sender);
// match app_shutdown_sender.send(()) {
// Err(x) => {
// panic!("Could not send shutdown signal! {:?}", x);
// }
// _ => {}
// };
// Drop the background handle, wait for any tasks that are on-going
match app_shutdown_sender.send(()) {
Err(x) => {
panic!("Could not send shutdown signal! {:?}", x);
}
_ => {}
};
// Wait for any tasks that are on-going
while let Some(x) = important_background_handles.next().await {
info!("Returned item is: {:?}", x);
match x {

@ -190,31 +190,8 @@ async fn run(
prometheus_shutdown_receiver,
));
// wait until the app has seen its first consensus head block
// if backups were included, wait a little longer
// let _ = spawned_app.app.head_block_receiver().changed().await;
// if backups were included, wait a little longer
// for _ in 0..3 {
// let _ = spawned_app.consensus_connections_watcher.changed().await;
//
// let consensus = spawned_app
// .consensus_connections_watcher
// .borrow_and_update();
//
// // Let's just do a super dirty unwrap to get things going
// if consensus.unwrap().backups_needed {
// info!(
// "waiting longer. found consensus with backups: {}",
// consensus.unwrap().head_block.as_ref().unwrap(),
// );
// } else {
// // TODO: also check that we have at least one archive node connected?
// break;
// }
// }
let _ = spawned_app.app.head_block_receiver().changed().await;
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(
app_frontend_port,
@ -268,6 +245,7 @@ async fn run(
}
}
}
// TODO: This seems to have been removed on the main branch
// TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though
x = spawned_app.background_handles.next() => {
match x {
@ -284,6 +262,7 @@ async fn run(
}
};
// TODO: This is also not there on the main branch
// if a future above completed, make sure the frontend knows to start turning off
if !frontend_exited {
if let Err(err) = frontend_shutdown_sender.send(()) {
@ -292,6 +271,7 @@ async fn run(
};
}
// TODO: Also not there on main branch
// TODO: wait until the frontend completes
if let Err(err) = frontend_shutdown_complete_receiver.recv().await {
warn!("shutdown completition err={:?}", err);

@ -238,18 +238,13 @@ pub async fn serve(
let server = axum::Server::bind(&addr)
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(service)
// <<<<<<< HEAD
// .with_graceful_shutdown(async move {
// let _ = shutdown_receiver.recv().await;
// })
// .await
// .map_err(Into::into);
//
// let _ = shutdown_complete_sender.send(());
//
// server
// =======
.await?;
.with_graceful_shutdown(async move {
let _ = shutdown_receiver.recv().await;
})
.await
.map_err(Into::into);
Ok(())
let _ = shutdown_complete_sender.send(());
server
}

@ -2,7 +2,7 @@
use super::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use super::consensus::ConsensusWeb3Rpcs;
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler};
use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
///! Load balanced communication with a group of web3 providers
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
@ -316,43 +316,8 @@ impl Web3Rpcs {
}
}
// <<<<<<< HEAD
Ok(())
}
// =======
// // TODO: max_capacity and time_to_idle from config
// // all block hashes are the same size, so no need for weigher
// let block_hashes = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// // all block numbers are the same size, so no need for weigher
// let block_numbers = Cache::builder()
// .time_to_idle(Duration::from_secs(600))
// .max_capacity(10_000)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
//
// let (watch_consensus_connections_sender, consensus_connections_watcher) =
// watch::channel(Default::default());
//
// let watch_consensus_head_receiver =
// watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
//
// let connections = Arc::new(Self {
// by_name: connections,
// watch_consensus_rpcs_sender: watch_consensus_connections_sender,
// watch_consensus_head_receiver,
// pending_transactions,
// block_hashes,
// block_numbers,
// min_sum_soft_limit,
// min_head_rpcs,
// max_block_age,
// max_block_lag,
// });
//
// let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
// >>>>>>> 77df3fa (stats v2)
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).cloned()
@ -913,7 +878,7 @@ impl Web3Rpcs {
.request(
&request.method,
&json!(request.params),
RequestErrorHandler::SaveRevert,
RequestRevertHandler::Save,
None,
)
.await;

@ -5,7 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::frontend::authorization::Authorization;
use crate::rpcs::request::RequestErrorHandler;
use crate::rpcs::request::RequestRevertHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256};
@ -716,9 +716,9 @@ impl Web3Rpc {
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
let error_handler = if self.backup {
RequestErrorHandler::DebugLevel
RequestRevertHandler::DebugLevel
} else {
RequestErrorHandler::ErrorLevel
RequestRevertHandler::ErrorLevel
};
let mut delay_start = false;
@ -1332,7 +1332,7 @@ impl Web3Rpc {
self: &Arc<Self>,
method: &str,
params: &P,
revert_handler: RequestErrorHandler,
revert_handler: RequestRevertHandler,
authorization: Arc<Authorization>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<R>

@ -35,7 +35,7 @@ pub struct OpenRequestHandle {
/// Depending on the context, RPC errors can require different handling.
#[derive(Copy, Clone)]
pub enum RequestErrorHandler {
pub enum RequestRevertHandler {
/// Log at the trace level. Use when errors are expected.
TraceLevel,
/// Log at the debug level. Use when errors are expected.
@ -45,7 +45,7 @@ pub enum RequestErrorHandler {
/// Log at the warn level. Use when errors do not cause problems.
WarnLevel,
/// Potentially save the revert. Users can tune how often this happens
SaveRevert,
Save,
}
// TODO: second param could be skipped since we don't need it here
@ -58,13 +58,13 @@ struct EthCallFirstParams {
data: Option<Bytes>,
}
impl From<Level> for RequestErrorHandler {
impl From<Level> for RequestRevertHandler {
fn from(level: Level) -> Self {
match level {
Level::Trace => RequestErrorHandler::TraceLevel,
Level::Debug => RequestErrorHandler::DebugLevel,
Level::Error => RequestErrorHandler::ErrorLevel,
Level::Warn => RequestErrorHandler::WarnLevel,
Level::Trace => RequestRevertHandler::TraceLevel,
Level::Debug => RequestRevertHandler::DebugLevel,
Level::Error => RequestRevertHandler::ErrorLevel,
Level::Warn => RequestRevertHandler::WarnLevel,
_ => unimplemented!("unexpected tracing Level"),
}
}
@ -150,7 +150,7 @@ impl OpenRequestHandle {
self,
method: &str,
params: &P,
revert_handler: RequestErrorHandler,
revert_handler: RequestRevertHandler,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Result<R, ProviderError>
where
@ -229,17 +229,17 @@ impl OpenRequestHandle {
if let Err(err) = &response {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
let error_handler = if let RequestErrorHandler::SaveRevert = revert_handler {
let error_handler = if let RequestRevertHandler::Save = revert_handler {
// TODO: should all these be Trace or Debug or a mix?
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else if self.authorization.db_conn.is_some() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
if log_revert_chance == 0.0 {
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else if log_revert_chance == 1.0 {
// trace!(%method, "gaurenteed chance. SAVING on revert");
revert_handler
@ -247,7 +247,7 @@ impl OpenRequestHandle {
< log_revert_chance
{
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
@ -255,7 +255,7 @@ impl OpenRequestHandle {
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
}
} else {
revert_handler
@ -263,10 +263,10 @@ impl OpenRequestHandle {
// TODO: simple enum -> string derive?
#[derive(Debug)]
enum ResponseErrorType {
enum ResponseTypes {
Revert,
RateLimit,
Error,
Ok,
}
// check for "execution reverted" here
@ -312,127 +312,87 @@ impl OpenRequestHandle {
if let Some(msg) = msg {
if msg.starts_with("execution reverted") {
trace!("revert from {}", self.rpc);
ResponseErrorType::Revert
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
trace!("rate limit from {}", self.rpc);
ResponseErrorType::RateLimit
ResponseTypes::RateLimit
} else {
ResponseErrorType::Error
ResponseTypes::Ok
}
} else {
ResponseErrorType::Error
ResponseTypes::Ok
}
} else {
ResponseErrorType::Error
ResponseTypes::Ok
};
match response_type {
ResponseErrorType::RateLimit => {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
// TODO: how long? different providers have different rate limiting periods, though most seem to be 1 second
// TODO: until the next second, or wait 1 whole second?
let retry_at = Instant::now() + Duration::from_secs(1);
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
let retry_at = Instant::now() + Duration::from_secs(1);
trace!("retry {} at: {:?}", self.rpc, retry_at);
trace!("retry {} at: {:?}", self.rpc, retry_at);
hard_limit_until.send_replace(retry_at);
}
}
ResponseErrorType::Error => {
// TODO: should we just have Error or RateLimit? do we need Error and Revert separate?
match error_handler {
RequestErrorHandler::DebugLevel => {
// TODO: include params only if not running in release mode
debug!(
"error response from {}! method={} params={:?} err={:?}",
self.rpc, method, params, err
);
}
RequestErrorHandler::TraceLevel => {
trace!(
"error response from {}! method={} params={:?} err={:?}",
self.rpc,
method,
params,
err
);
}
RequestErrorHandler::ErrorLevel => {
// TODO: include params only if not running in release mode
error!(
"error response from {}! method={} err={:?}",
self.rpc, method, err
);
}
RequestErrorHandler::SaveRevert | RequestErrorHandler::WarnLevel => {
// TODO: include params only if not running in release mode
warn!(
"error response from {}! method={} err={:?}",
self.rpc, method, err
);
}
}
}
ResponseErrorType::Revert => {
match error_handler {
RequestErrorHandler::DebugLevel => {
// TODO: include params only if not running in release mode
debug!(
"revert response from {}! method={} params={:?} err={:?}",
self.rpc, method, params, err
);
}
RequestErrorHandler::TraceLevel => {
trace!(
"revert response from {}! method={} params={:?} err={:?}",
self.rpc,
method,
params,
err
);
}
RequestErrorHandler::ErrorLevel => {
// TODO: include params only if not running in release mode
error!(
"revert response from {}! method={} err={:?}",
self.rpc, method, err
);
}
RequestErrorHandler::WarnLevel => {
// TODO: include params only if not running in release mode
warn!(
"revert response from {}! method={} err={:?}",
self.rpc, method, err
);
}
RequestErrorHandler::SaveRevert => {
trace!(
"revert response from {}! method={} params={:?} err={:?}",
self.rpc,
method,
params,
err
);
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method =
Method::try_from_value(&method.to_string()).unwrap();
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
let params: EthCallParams = serde_json::from_value(json!(params))
.context("parsing params to EthCallParams")
.unwrap();
// spawn saving to the database so we don't slow down the request
let f = self.authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}
}
hard_limit_until.send_replace(retry_at);
}
}
// TODO: think more about the method and param logs. those can be sensitive information
match revert_handler {
RequestRevertHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if matches!(response_type, ResponseTypes::Revert) {
debug!(
"bad response from {}! method={} params={:?} err={:?}",
self.rpc, method, params, err
);
}
}
RequestRevertHandler::TraceLevel => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.rpc,
method,
params,
err
);
}
RequestRevertHandler::ErrorLevel => {
// TODO: include params if not running in release mode
error!(
"bad response from {}! method={} err={:?}",
self.rpc, method, err
);
}
RequestRevertHandler::WarnLevel => {
// TODO: include params if not running in release mode
warn!(
"bad response from {}! method={} err={:?}",
self.rpc, method, err
);
}
RequestRevertHandler::Save => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.rpc,
method,
params,
err
);
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method = Method::try_from_value(&method.to_string()).unwrap();
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
let params: EthCallParams = serde_json::from_value(json!(params))
.context("parsing params to EthCallParams")
.unwrap();
// spawn saving to the database so we don't slow down the request
let f = self.authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}
}
// TODO: track error latency?
} else {
// TODO: record request latency
// let latency_ms = start.elapsed().as_secs_f64() * 1000.0;