diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2e10d9e5..cfc5767b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -39,6 +39,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use moka::future::{Cache, CacheBuilder}; +use once_cell::sync::OnceCell; use parking_lot::RwLock; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; @@ -49,6 +50,7 @@ use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; @@ -81,15 +83,14 @@ pub type UserBalanceCache = Cache>>; pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, + /// concurrent/parallel application request limits for authenticated users + pub bearer_token_semaphores: Cache>, /// Send 4337 Abstraction Bundler requests to one of these servers pub bundler_4337_rpcs: Option>, - pub http_client: Option, /// application config /// TODO: this will need a large refactor to handle reloads while running. maybe use a watch::Receiver? pub config: AppConfig, - /// Send private requests (like eth_sendRawTransaction) to all these servers - /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests - pub private_rpcs: Option>, + pub http_client: Option, /// track JSONRPC responses pub jsonrpc_response_cache: JsonRpcResponseCache, /// rpc clients that subscribe to newHeads use this channel @@ -104,22 +105,26 @@ pub struct Web3ProxyApp { /// Optional read-only database for users and accounting pub db_replica: Option, pub hostname: Option, - pub internal_provider: Arc, - /// store pending transactions that we've seen so that we don't send duplicates to subscribers - /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries - pub pending_transactions: Cache, + pub frontend_port: Arc, /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users pub frontend_registered_user_rate_limiter: Option>, /// Optional time series database for making pretty graphs that load quickly pub influxdb_client: Option, + /// concurrent/parallel request limits for anonymous users + pub ip_semaphores: Cache>, + pub kafka_producer: Option, /// rate limit the login endpoint /// we do this because each pending login is a row in the database pub login_rate_limiter: Option, - /// volatile cache used for rate limits - /// TODO: i think i might just delete this entirely. instead use local-only concurrency limits. - pub vredis_pool: Option, + /// store pending transactions that we've seen so that we don't send duplicates to subscribers + /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries + pub pending_transactions: Cache, + /// Send private requests (like eth_sendRawTransaction) to all these servers + /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests + pub private_rpcs: Option>, + pub prometheus_port: Arc, /// cache authenticated users so that we don't have to query the database on the hot path // TODO: should the key be our RpcSecretKey class instead of Ulid? pub rpc_secret_key_cache: RpcSecretKeyCache, @@ -127,13 +132,13 @@ pub struct Web3ProxyApp { pub user_balance_cache: UserBalanceCache, /// concurrent/parallel RPC request limits for authenticated users pub user_semaphores: Cache>, - /// concurrent/parallel request limits for anonymous users - pub ip_semaphores: Cache>, - /// concurrent/parallel application request limits for authenticated users - pub bearer_token_semaphores: Cache>, - pub kafka_producer: Option, + /// volatile cache used for rate limits + /// TODO: i think i might just delete this entirely. instead use local-only concurrency limits. + pub vredis_pool: Option, /// channel for sending stats in a background task pub stat_sender: Option>, + + internal_provider: OnceCell>, } /// flatten a JoinError into an anyhow error @@ -178,7 +183,8 @@ pub struct Web3ProxyAppSpawn { impl Web3ProxyApp { /// The main entrypoint. pub async fn spawn( - app_frontend_port: u16, + frontend_port: Arc, + prometheus_port: Arc, top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, @@ -584,20 +590,6 @@ impl Web3ProxyApp { .ok() .and_then(|x| x.to_str().map(|x| x.to_string())); - // TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now - // TODO: what interval? i don't think we use it - // i tried and failed to `impl JsonRpcClient for Web3ProxyApi` - // i tried and failed to set up ipc. http is already running, so lets just use that - let internal_provider = connect_http( - format!("http://127.0.0.1:{}", app_frontend_port) - .parse() - .unwrap(), - http_client.clone(), - Duration::from_secs(10), - )?; - - let internal_provider = Arc::new(internal_provider); - let app = Self { balanced_rpcs, bearer_token_semaphores, @@ -605,15 +597,13 @@ impl Web3ProxyApp { config: top_config.app.clone(), db_conn, db_replica, + frontend_port: frontend_port.clone(), frontend_ip_rate_limiter, frontend_registered_user_rate_limiter, hostname, - vredis_pool, - rpc_secret_key_cache, - user_balance_cache, http_client, influxdb_client, - internal_provider, + internal_provider: Default::default(), ip_semaphores, jsonrpc_response_cache, kafka_producer, @@ -621,8 +611,12 @@ impl Web3ProxyApp { pending_transactions, pending_tx_sender, private_rpcs, + prometheus_port: prometheus_port.clone(), + rpc_secret_key_cache, stat_sender, + user_balance_cache, user_semaphores, + vredis_pool, watch_consensus_head_receiver, }; @@ -719,7 +713,28 @@ impl Web3ProxyApp { /// this works for now, but I don't like it /// TODO: I would much prefer we figure out the traits and `impl JsonRpcClient for Web3ProxyApp` pub fn internal_provider(&self) -> &Arc { - &self.internal_provider + self.internal_provider.get_or_init(|| { + // TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now + // TODO: what interval? i don't think we use it + // i tried and failed to `impl JsonRpcClient for Web3ProxyApi` + // i tried and failed to set up ipc. http is already running, so lets just use that + let frontend_port = self.frontend_port.load(Ordering::Relaxed); + + if frontend_port == 0 { + panic!("frontend is not running. cannot create provider yet"); + } + + let internal_provider = connect_http( + format!("http://127.0.0.1:{}", frontend_port) + .parse() + .unwrap(), + self.http_client.clone(), + Duration::from_secs(10), + ) + .unwrap(); + + Arc::new(internal_provider) + }) } pub async fn prometheus_metrics(&self) -> String { diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index f74b8d9e..bed5e04c 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -1,8 +1,11 @@ #![forbid(unsafe_code)] + use argh::FromArgs; use futures::StreamExt; use num::Zero; use std::path::PathBuf; +use std::sync::atomic::AtomicU16; +use std::sync::Arc; use std::time::Duration; use std::{fs, thread}; use tokio::sync::broadcast; @@ -35,11 +38,14 @@ impl ProxydSubCommand { let (shutdown_sender, _) = broadcast::channel(1); // TODO: i think there is a small race. if config_path changes + let frontend_port = Arc::new(self.port.into()); + let prometheus_port = Arc::new(self.prometheus_port.into()); + run( top_config, Some(top_config_path), - self.port, - self.prometheus_port, + frontend_port, + prometheus_port, num_workers, shutdown_sender, ) @@ -50,8 +56,8 @@ impl ProxydSubCommand { async fn run( top_config: TopConfig, top_config_path: Option, - frontend_port: u16, - prometheus_port: u16, + frontend_port: Arc, + prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, ) -> anyhow::Result<()> { @@ -59,9 +65,6 @@ async fn run( // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` - let app_frontend_port = frontend_port; - let app_prometheus_port = prometheus_port; - // TODO: should we use a watch or broadcast for these? // Maybe this one ? // let mut shutdown_receiver = shutdown_sender.subscribe(); @@ -76,7 +79,8 @@ async fn run( // start the main app let mut spawned_app = Web3ProxyApp::spawn( - app_frontend_port, + frontend_port, + prometheus_port, top_config.clone(), num_workers, app_shutdown_sender.clone(), @@ -120,7 +124,6 @@ async fn run( // start the prometheus metrics port let prometheus_handle = tokio::spawn(prometheus::serve( spawned_app.app.clone(), - app_prometheus_port, prometheus_shutdown_receiver, )); @@ -128,7 +131,6 @@ async fn run( // start the frontend port let frontend_handle = tokio::spawn(frontend::serve( - app_frontend_port, spawned_app.app, frontend_shutdown_receiver, frontend_shutdown_complete_sender, @@ -252,107 +254,154 @@ async fn run( #[cfg(test)] mod tests { + use super::*; use ethers::{ prelude::{Http, Provider, U256}, + types::Address, utils::Anvil, }; use hashbrown::HashMap; - use std::env; - use tokio::task::JoinHandle; - + use parking_lot::Mutex; + use std::{ + env, + str::FromStr, + sync::atomic::{AtomicU16, Ordering}, + }; + use tokio::{ + sync::broadcast::error::SendError, + task::JoinHandle, + time::{sleep, Instant}, + }; use web3_proxy::{ config::{AppConfig, Web3RpcConfig}, rpcs::blockchain::ArcBlock, }; - use super::*; + // TODO: put it in a thread? + struct TestApp { + handle: Mutex>>>, + anvil_provider: Provider, + proxy_provider: Provider, + shutdown_sender: broadcast::Sender<()>, + } - #[test_log::test(tokio::test)] - async fn it_works() { - // TODO: move basic setup into a test fixture - let path = env::var("PATH").unwrap(); + impl TestApp { + async fn spawn() -> Self { + // TODO: move basic setup into a test fixture + let path = env::var("PATH").unwrap(); - info!("path: {}", path); + info!("path: {}", path); - // todo: fork polygon so we can test our payment contracts - let anvil = Anvil::new().spawn(); + // TODO: configurable rpc and block + let anvil = Anvil::new() + // .fork("https://polygon.llamarpc.com@44300000") + .spawn(); - info!("Anvil running at `{}`", anvil.endpoint()); + info!("Anvil running at `{}`", anvil.endpoint()); - let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); + let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); - // mine a block because my code doesn't like being on block 0 - // TODO: make block 0 okay? is it okay now? - let _: U256 = anvil_provider - .request("evm_mine", None::<()>) - .await - .unwrap(); + // mine a block to test the provider + let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap(); - // make a test TopConfig - // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports - let top_config = TopConfig { - app: AppConfig { - chain_id: 31337, - default_user_max_requests_per_period: Some(6_000_000), - min_sum_soft_limit: 1, - min_synced_rpcs: 1, - public_requests_per_period: Some(1_000_000), - response_cache_max_bytes: 10_u64.pow(7), - ..Default::default() - }, - balanced_rpcs: HashMap::from([ - ( - "anvil".to_string(), - Web3RpcConfig { - http_url: Some(anvil.endpoint()), - soft_limit: 100, - ..Default::default() - }, - ), - ( - "anvil_ws".to_string(), - Web3RpcConfig { - ws_url: Some(anvil.ws_endpoint()), - soft_limit: 100, - ..Default::default() - }, - ), - ( - // TODO: i don't think "both" is working + // make a test TopConfig + // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports + let top_config = TopConfig { + app: AppConfig { + chain_id: 137, + default_user_max_requests_per_period: Some(6_000_000), + deposit_factory_contract: Address::from_str( + "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", + ) + .ok(), + min_sum_soft_limit: 1, + min_synced_rpcs: 1, + public_requests_per_period: Some(1_000_000), + response_cache_max_bytes: 10_u64.pow(7), + ..Default::default() + }, + balanced_rpcs: HashMap::from([( "anvil_both".to_string(), Web3RpcConfig { http_url: Some(anvil.endpoint()), ws_url: Some(anvil.ws_endpoint()), ..Default::default() }, - ), - ]), - private_rpcs: None, - bundler_4337_rpcs: None, - extra: Default::default(), - }; + )]), + private_rpcs: None, + bundler_4337_rpcs: None, + extra: Default::default(), + }; - let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); + let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); - // spawn another thread for running the app - // TODO: allow launching into the local tokio runtime instead of creating a new one? - let app_handle = { - let frontend_port = 0; - let prometheus_port = 0; - let shutdown_sender = shutdown_sender.clone(); + let frontend_port_arc = Arc::new(AtomicU16::new(0)); + let prometheus_port_arc = Arc::new(AtomicU16::new(0)); - tokio::spawn(run( - top_config, - None, - frontend_port, - prometheus_port, - 2, + // spawn another thread for running the app + // TODO: allow launching into the local tokio runtime instead of creating a new one? + let handle = { + tokio::spawn(run( + top_config, + None, + frontend_port_arc.clone(), + prometheus_port_arc, + 2, + shutdown_sender.clone(), + )) + }; + + let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed); + let start = Instant::now(); + while frontend_port == 0 { + if start.elapsed() > Duration::from_secs(1) { + panic!("took too long to start!"); + } + + sleep(Duration::from_millis(10)).await; + frontend_port = frontend_port_arc.load(Ordering::Relaxed); + } + + let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port); + + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + + Self { + handle: Mutex::new(Some(handle)), + anvil_provider, + proxy_provider, shutdown_sender, - )) - }; + } + } - // TODO: do something to the node. query latest block, mine another block, query again - let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); + fn stop(&self) -> Result> { + self.shutdown_sender.send(()) + } + + async fn wait(&self) { + // TODO: lock+take feels weird, but it works + let handle = self.handle.lock().take(); + + if let Some(handle) = handle { + info!("waiting for the app to stop..."); + handle.await.unwrap().unwrap(); + } + } + } + + impl Drop for TestApp { + fn drop(&mut self) { + let _ = self.stop(); + } + } + + #[test_log::test(tokio::test)] + async fn it_works() { + // TODO: move basic setup into a test fixture + let x = TestApp::spawn().await; + + let anvil_provider = &x.anvil_provider; + let proxy_provider = &x.proxy_provider; let anvil_result = anvil_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) @@ -391,12 +440,7 @@ mod tests { assert_eq!(first_block_num, second_block_num - 1); - // TODO: how do we make fixtures run this at the end? - // tell the test app to shut down - shutdown_sender.send(()).unwrap(); - - info!("waiting for shutdown..."); - // TODO: panic if a timeout is reached - app_handle.await.unwrap().unwrap(); + // x.stop(); + // x.wait().await; } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 3048e81b..d05cbcbc 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -2,7 +2,7 @@ use crate::app::Web3ProxyJoinHandle; use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; use argh::FromArgs; -use ethers::prelude::{Address, TxHash, H256}; +use ethers::prelude::{Address, TxHash}; use ethers::types::{U256, U64}; use hashbrown::HashMap; use migration::sea_orm::DatabaseConnection; @@ -98,9 +98,6 @@ pub struct AppConfig { /// Default ERC address for out deposit contract pub deposit_factory_contract: Option
, - /// Default ERC address for out deposit contract - pub deposit_topic: Option, - /// minimum amount to increase eth_estimateGas results pub gas_increase_min: Option, @@ -275,6 +272,7 @@ pub struct Web3RpcConfig { /// block data limit. If None, will be queried pub block_data_limit: Option, /// the requests per second at which the server starts slowing down + #[serde(default = "default_soft_limit")] pub soft_limit: u32, /// the requests per second at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, @@ -290,6 +288,10 @@ pub struct Web3RpcConfig { pub extra: HashMap, } +fn default_soft_limit() -> u32 { + 10 +} + impl Web3RpcConfig { /// Create a Web3Rpc from config /// TODO: move this into Web3Rpc? (just need to make things pub(crate)) diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index fa51341e..21dabe3b 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -363,7 +363,7 @@ pub async fn admin_imitate_login_post( // mostly default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts { - rpc_provider: Some(app.internal_provider.clone()), + rpc_provider: Some(app.internal_provider().clone()), ..Default::default() }; diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index dd1061e8..62998126 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -18,9 +18,9 @@ use axum::{ use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use moka::future::{Cache, CacheBuilder}; -use std::net::SocketAddr; use std::sync::Arc; use std::{iter::once, time::Duration}; +use std::{net::SocketAddr, sync::atomic::Ordering}; use strum::{EnumCount, EnumIter}; use tokio::sync::broadcast; use tower_http::cors::CorsLayer; @@ -41,8 +41,7 @@ pub type ResponseCache = Cache, + app: Arc, mut shutdown_receiver: broadcast::Receiver<()>, shutdown_complete_sender: broadcast::Sender<()>, ) -> Web3ProxyResult<()> { @@ -59,7 +58,7 @@ pub async fn serve( // TODO: read config for if fastest/versus should be available publicly. default off // build our axum Router - let app = Router::new() + let router = Router::new() // TODO: i think these routes could be done a lot better // // HTTP RPC (POST) @@ -237,7 +236,7 @@ pub async fn serve( // handle cors .layer(CorsLayer::very_permissive()) // application state - .layer(Extension(proxy_app)) + .layer(Extension(app.clone())) // frontend caches .layer(Extension(Arc::new(response_cache))) // 404 for any unknown routes @@ -251,9 +250,8 @@ pub async fn serve( axum::Server::from_tcp(listener)? } else { - info!("listening on port {}", port); // TODO: allow only listening on localhost? top_config.app.host.parse()? - let addr = SocketAddr::from(([0, 0, 0, 0], port)); + let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::Relaxed))); axum::Server::try_bind(&addr)? }; @@ -269,17 +267,23 @@ pub async fn serve( #[cfg(feature = "connectinfo")] let make_service = { info!("connectinfo feature enabled"); - app.into_make_service_with_connect_info::() + router.into_make_service_with_connect_info::() }; #[cfg(not(feature = "connectinfo"))] let make_service = { info!("connectinfo feature disabled"); - app.into_make_service() + router.into_make_service() }; - let server = server_builder - .serve(make_service) + let server = server_builder.serve(make_service); + + let port = server.local_addr().port(); + info!("listening on port {}", port); + + app.frontend_port.store(port, Ordering::Relaxed); + + let server = server // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .with_graceful_shutdown(async move { let _ = shutdown_receiver.recv().await; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index ed125aad..727d7f35 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -38,9 +38,10 @@ use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock}; use tracing::{info, trace}; /// How to select backend servers for a request -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Default)] pub enum ProxyMode { /// send to the "best" synced server + #[default] Best, /// send to all synced servers and return the fastest non-error response (reverts do not count as errors here) Fastest(usize), @@ -51,12 +52,6 @@ pub enum ProxyMode { Debug, } -impl Default for ProxyMode { - fn default() -> Self { - Self::Best - } -} - /// Public entrypoint for WebSocket JSON-RPC requests. /// Queries a single server at a time #[debug_handler] diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 9f342a86..a874f579 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -257,7 +257,7 @@ pub async fn user_login_post( // mostly default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts { - rpc_provider: Some(app.internal_provider.clone()), + rpc_provider: Some(app.internal_provider().clone()), ..Default::default() }; diff --git a/web3_proxy/src/prometheus.rs b/web3_proxy/src/prometheus.rs index 6ac30d5b..027db703 100644 --- a/web3_proxy/src/prometheus.rs +++ b/web3_proxy/src/prometheus.rs @@ -3,6 +3,7 @@ use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::{routing::get, Extension, Router}; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::broadcast; use tracing::info; @@ -13,21 +14,29 @@ use crate::errors::Web3ProxyResult; /// Run a prometheus metrics server on the given port. pub async fn serve( app: Arc, - port: u16, mut shutdown_receiver: broadcast::Receiver<()>, ) -> Web3ProxyResult<()> { // routes should be ordered most to least common - let app = Router::new().route("/", get(root)).layer(Extension(app)); + let router = Router::new() + .route("/", get(root)) + .layer(Extension(app.clone())); + // note: the port here might be 0 + let port = app.prometheus_port.load(Ordering::Relaxed); // TODO: config for the host? let addr = SocketAddr::from(([0, 0, 0, 0], port)); - info!("prometheus listening on port {}", port); - let service = app.into_make_service(); + let service = router.into_make_service(); // `axum::Server` is a re-export of `hyper::Server` - axum::Server::bind(&addr) - .serve(service) + let server = axum::Server::bind(&addr).serve(service); + + let port = server.local_addr().port(); + info!("prometheus listening on port {}", port); + + app.prometheus_port.store(port, Ordering::Relaxed); + + server .with_graceful_shutdown(async move { let _ = shutdown_receiver.recv().await; }) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index f439625b..914d7351 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1376,8 +1376,6 @@ impl Serialize for Web3Rpcs { mod tests { #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; - use super::*; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusFinder; @@ -1387,6 +1385,7 @@ mod tests { use latency::PeakEwmaLatency; use moka::future::CacheBuilder; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tracing::trace; #[cfg(test)] @@ -1394,6 +1393,7 @@ mod tests { PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } + // TODO: logging #[tokio::test(start_paused = true)] async fn test_sort_connections_by_sync_status() { // TODO: how should we do test logging setup with tracing?