better handling of 0 port and fix test port
This commit is contained in:
parent
ab3d4a91a8
commit
efe1e30791
@ -39,6 +39,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
|
||||
use moka::future::{Cache, CacheBuilder};
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::RwLock;
|
||||
use redis_rate_limiter::redis::AsyncCommands;
|
||||
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
|
||||
@ -49,6 +50,7 @@ use std::fmt;
|
||||
use std::net::IpAddr;
|
||||
use std::num::NonZeroU64;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{broadcast, watch, Semaphore};
|
||||
@ -81,15 +83,14 @@ pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>;
|
||||
pub struct Web3ProxyApp {
|
||||
/// Send requests to the best server available
|
||||
pub balanced_rpcs: Arc<Web3Rpcs>,
|
||||
/// concurrent/parallel application request limits for authenticated users
|
||||
pub bearer_token_semaphores: Cache<UserBearerToken, Arc<Semaphore>>,
|
||||
/// Send 4337 Abstraction Bundler requests to one of these servers
|
||||
pub bundler_4337_rpcs: Option<Arc<Web3Rpcs>>,
|
||||
pub http_client: Option<reqwest::Client>,
|
||||
/// application config
|
||||
/// TODO: this will need a large refactor to handle reloads while running. maybe use a watch::Receiver?
|
||||
pub config: AppConfig,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
|
||||
pub private_rpcs: Option<Arc<Web3Rpcs>>,
|
||||
pub http_client: Option<reqwest::Client>,
|
||||
/// track JSONRPC responses
|
||||
pub jsonrpc_response_cache: JsonRpcResponseCache,
|
||||
/// rpc clients that subscribe to newHeads use this channel
|
||||
@ -104,22 +105,26 @@ pub struct Web3ProxyApp {
|
||||
/// Optional read-only database for users and accounting
|
||||
pub db_replica: Option<DatabaseReplica>,
|
||||
pub hostname: Option<String>,
|
||||
pub internal_provider: Arc<EthersHttpProvider>,
|
||||
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
|
||||
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
|
||||
pub pending_transactions: Cache<TxHash, TxStatus>,
|
||||
pub frontend_port: Arc<AtomicU16>,
|
||||
/// rate limit anonymous users
|
||||
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
|
||||
/// rate limit authenticated users
|
||||
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
|
||||
/// Optional time series database for making pretty graphs that load quickly
|
||||
pub influxdb_client: Option<influxdb2::Client>,
|
||||
/// concurrent/parallel request limits for anonymous users
|
||||
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
|
||||
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
|
||||
/// rate limit the login endpoint
|
||||
/// we do this because each pending login is a row in the database
|
||||
pub login_rate_limiter: Option<RedisRateLimiter>,
|
||||
/// volatile cache used for rate limits
|
||||
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
|
||||
pub vredis_pool: Option<RedisPool>,
|
||||
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
|
||||
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
|
||||
pub pending_transactions: Cache<TxHash, TxStatus>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
|
||||
pub private_rpcs: Option<Arc<Web3Rpcs>>,
|
||||
pub prometheus_port: Arc<AtomicU16>,
|
||||
/// cache authenticated users so that we don't have to query the database on the hot path
|
||||
// TODO: should the key be our RpcSecretKey class instead of Ulid?
|
||||
pub rpc_secret_key_cache: RpcSecretKeyCache,
|
||||
@ -127,13 +132,13 @@ pub struct Web3ProxyApp {
|
||||
pub user_balance_cache: UserBalanceCache,
|
||||
/// concurrent/parallel RPC request limits for authenticated users
|
||||
pub user_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
|
||||
/// concurrent/parallel request limits for anonymous users
|
||||
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
|
||||
/// concurrent/parallel application request limits for authenticated users
|
||||
pub bearer_token_semaphores: Cache<UserBearerToken, Arc<Semaphore>>,
|
||||
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
|
||||
/// volatile cache used for rate limits
|
||||
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
|
||||
pub vredis_pool: Option<RedisPool>,
|
||||
/// channel for sending stats in a background task
|
||||
pub stat_sender: Option<flume::Sender<AppStat>>,
|
||||
|
||||
internal_provider: OnceCell<Arc<EthersHttpProvider>>,
|
||||
}
|
||||
|
||||
/// flatten a JoinError into an anyhow error
|
||||
@ -178,7 +183,8 @@ pub struct Web3ProxyAppSpawn {
|
||||
impl Web3ProxyApp {
|
||||
/// The main entrypoint.
|
||||
pub async fn spawn(
|
||||
app_frontend_port: u16,
|
||||
frontend_port: Arc<AtomicU16>,
|
||||
prometheus_port: Arc<AtomicU16>,
|
||||
top_config: TopConfig,
|
||||
num_workers: usize,
|
||||
shutdown_sender: broadcast::Sender<()>,
|
||||
@ -584,20 +590,6 @@ impl Web3ProxyApp {
|
||||
.ok()
|
||||
.and_then(|x| x.to_str().map(|x| x.to_string()));
|
||||
|
||||
// TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now
|
||||
// TODO: what interval? i don't think we use it
|
||||
// i tried and failed to `impl JsonRpcClient for Web3ProxyApi`
|
||||
// i tried and failed to set up ipc. http is already running, so lets just use that
|
||||
let internal_provider = connect_http(
|
||||
format!("http://127.0.0.1:{}", app_frontend_port)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
http_client.clone(),
|
||||
Duration::from_secs(10),
|
||||
)?;
|
||||
|
||||
let internal_provider = Arc::new(internal_provider);
|
||||
|
||||
let app = Self {
|
||||
balanced_rpcs,
|
||||
bearer_token_semaphores,
|
||||
@ -605,15 +597,13 @@ impl Web3ProxyApp {
|
||||
config: top_config.app.clone(),
|
||||
db_conn,
|
||||
db_replica,
|
||||
frontend_port: frontend_port.clone(),
|
||||
frontend_ip_rate_limiter,
|
||||
frontend_registered_user_rate_limiter,
|
||||
hostname,
|
||||
vredis_pool,
|
||||
rpc_secret_key_cache,
|
||||
user_balance_cache,
|
||||
http_client,
|
||||
influxdb_client,
|
||||
internal_provider,
|
||||
internal_provider: Default::default(),
|
||||
ip_semaphores,
|
||||
jsonrpc_response_cache,
|
||||
kafka_producer,
|
||||
@ -621,8 +611,12 @@ impl Web3ProxyApp {
|
||||
pending_transactions,
|
||||
pending_tx_sender,
|
||||
private_rpcs,
|
||||
prometheus_port: prometheus_port.clone(),
|
||||
rpc_secret_key_cache,
|
||||
stat_sender,
|
||||
user_balance_cache,
|
||||
user_semaphores,
|
||||
vredis_pool,
|
||||
watch_consensus_head_receiver,
|
||||
};
|
||||
|
||||
@ -719,7 +713,28 @@ impl Web3ProxyApp {
|
||||
/// this works for now, but I don't like it
|
||||
/// TODO: I would much prefer we figure out the traits and `impl JsonRpcClient for Web3ProxyApp`
|
||||
pub fn internal_provider(&self) -> &Arc<EthersHttpProvider> {
|
||||
&self.internal_provider
|
||||
self.internal_provider.get_or_init(|| {
|
||||
// TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now
|
||||
// TODO: what interval? i don't think we use it
|
||||
// i tried and failed to `impl JsonRpcClient for Web3ProxyApi`
|
||||
// i tried and failed to set up ipc. http is already running, so lets just use that
|
||||
let frontend_port = self.frontend_port.load(Ordering::Relaxed);
|
||||
|
||||
if frontend_port == 0 {
|
||||
panic!("frontend is not running. cannot create provider yet");
|
||||
}
|
||||
|
||||
let internal_provider = connect_http(
|
||||
format!("http://127.0.0.1:{}", frontend_port)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
self.http_client.clone(),
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Arc::new(internal_provider)
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn prometheus_metrics(&self) -> String {
|
||||
|
@ -1,8 +1,11 @@
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
use argh::FromArgs;
|
||||
use futures::StreamExt;
|
||||
use num::Zero;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU16;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
use tokio::sync::broadcast;
|
||||
@ -35,11 +38,14 @@ impl ProxydSubCommand {
|
||||
let (shutdown_sender, _) = broadcast::channel(1);
|
||||
// TODO: i think there is a small race. if config_path changes
|
||||
|
||||
let frontend_port = Arc::new(self.port.into());
|
||||
let prometheus_port = Arc::new(self.prometheus_port.into());
|
||||
|
||||
run(
|
||||
top_config,
|
||||
Some(top_config_path),
|
||||
self.port,
|
||||
self.prometheus_port,
|
||||
frontend_port,
|
||||
prometheus_port,
|
||||
num_workers,
|
||||
shutdown_sender,
|
||||
)
|
||||
@ -50,8 +56,8 @@ impl ProxydSubCommand {
|
||||
async fn run(
|
||||
top_config: TopConfig,
|
||||
top_config_path: Option<PathBuf>,
|
||||
frontend_port: u16,
|
||||
prometheus_port: u16,
|
||||
frontend_port: Arc<AtomicU16>,
|
||||
prometheus_port: Arc<AtomicU16>,
|
||||
num_workers: usize,
|
||||
frontend_shutdown_sender: broadcast::Sender<()>,
|
||||
) -> anyhow::Result<()> {
|
||||
@ -59,9 +65,6 @@ async fn run(
|
||||
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
|
||||
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
|
||||
|
||||
let app_frontend_port = frontend_port;
|
||||
let app_prometheus_port = prometheus_port;
|
||||
|
||||
// TODO: should we use a watch or broadcast for these?
|
||||
// Maybe this one ?
|
||||
// let mut shutdown_receiver = shutdown_sender.subscribe();
|
||||
@ -76,7 +79,8 @@ async fn run(
|
||||
|
||||
// start the main app
|
||||
let mut spawned_app = Web3ProxyApp::spawn(
|
||||
app_frontend_port,
|
||||
frontend_port,
|
||||
prometheus_port,
|
||||
top_config.clone(),
|
||||
num_workers,
|
||||
app_shutdown_sender.clone(),
|
||||
@ -120,7 +124,6 @@ async fn run(
|
||||
// start the prometheus metrics port
|
||||
let prometheus_handle = tokio::spawn(prometheus::serve(
|
||||
spawned_app.app.clone(),
|
||||
app_prometheus_port,
|
||||
prometheus_shutdown_receiver,
|
||||
));
|
||||
|
||||
@ -128,7 +131,6 @@ async fn run(
|
||||
|
||||
// start the frontend port
|
||||
let frontend_handle = tokio::spawn(frontend::serve(
|
||||
app_frontend_port,
|
||||
spawned_app.app,
|
||||
frontend_shutdown_receiver,
|
||||
frontend_shutdown_complete_sender,
|
||||
@ -252,107 +254,154 @@ async fn run(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use ethers::{
|
||||
prelude::{Http, Provider, U256},
|
||||
types::Address,
|
||||
utils::Anvil,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use std::env;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use std::{
|
||||
env,
|
||||
str::FromStr,
|
||||
sync::atomic::{AtomicU16, Ordering},
|
||||
};
|
||||
use tokio::{
|
||||
sync::broadcast::error::SendError,
|
||||
task::JoinHandle,
|
||||
time::{sleep, Instant},
|
||||
};
|
||||
use web3_proxy::{
|
||||
config::{AppConfig, Web3RpcConfig},
|
||||
rpcs::blockchain::ArcBlock,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
// TODO: put it in a thread?
|
||||
struct TestApp {
|
||||
handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
|
||||
anvil_provider: Provider<Http>,
|
||||
proxy_provider: Provider<Http>,
|
||||
shutdown_sender: broadcast::Sender<()>,
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn it_works() {
|
||||
// TODO: move basic setup into a test fixture
|
||||
let path = env::var("PATH").unwrap();
|
||||
impl TestApp {
|
||||
async fn spawn() -> Self {
|
||||
// TODO: move basic setup into a test fixture
|
||||
let path = env::var("PATH").unwrap();
|
||||
|
||||
info!("path: {}", path);
|
||||
info!("path: {}", path);
|
||||
|
||||
// todo: fork polygon so we can test our payment contracts
|
||||
let anvil = Anvil::new().spawn();
|
||||
// TODO: configurable rpc and block
|
||||
let anvil = Anvil::new()
|
||||
// .fork("https://polygon.llamarpc.com@44300000")
|
||||
.spawn();
|
||||
|
||||
info!("Anvil running at `{}`", anvil.endpoint());
|
||||
info!("Anvil running at `{}`", anvil.endpoint());
|
||||
|
||||
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
|
||||
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
|
||||
|
||||
// mine a block because my code doesn't like being on block 0
|
||||
// TODO: make block 0 okay? is it okay now?
|
||||
let _: U256 = anvil_provider
|
||||
.request("evm_mine", None::<()>)
|
||||
.await
|
||||
.unwrap();
|
||||
// mine a block to test the provider
|
||||
let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap();
|
||||
|
||||
// make a test TopConfig
|
||||
// TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports
|
||||
let top_config = TopConfig {
|
||||
app: AppConfig {
|
||||
chain_id: 31337,
|
||||
default_user_max_requests_per_period: Some(6_000_000),
|
||||
min_sum_soft_limit: 1,
|
||||
min_synced_rpcs: 1,
|
||||
public_requests_per_period: Some(1_000_000),
|
||||
response_cache_max_bytes: 10_u64.pow(7),
|
||||
..Default::default()
|
||||
},
|
||||
balanced_rpcs: HashMap::from([
|
||||
(
|
||||
"anvil".to_string(),
|
||||
Web3RpcConfig {
|
||||
http_url: Some(anvil.endpoint()),
|
||||
soft_limit: 100,
|
||||
..Default::default()
|
||||
},
|
||||
),
|
||||
(
|
||||
"anvil_ws".to_string(),
|
||||
Web3RpcConfig {
|
||||
ws_url: Some(anvil.ws_endpoint()),
|
||||
soft_limit: 100,
|
||||
..Default::default()
|
||||
},
|
||||
),
|
||||
(
|
||||
// TODO: i don't think "both" is working
|
||||
// make a test TopConfig
|
||||
// TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports
|
||||
let top_config = TopConfig {
|
||||
app: AppConfig {
|
||||
chain_id: 137,
|
||||
default_user_max_requests_per_period: Some(6_000_000),
|
||||
deposit_factory_contract: Address::from_str(
|
||||
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36",
|
||||
)
|
||||
.ok(),
|
||||
min_sum_soft_limit: 1,
|
||||
min_synced_rpcs: 1,
|
||||
public_requests_per_period: Some(1_000_000),
|
||||
response_cache_max_bytes: 10_u64.pow(7),
|
||||
..Default::default()
|
||||
},
|
||||
balanced_rpcs: HashMap::from([(
|
||||
"anvil_both".to_string(),
|
||||
Web3RpcConfig {
|
||||
http_url: Some(anvil.endpoint()),
|
||||
ws_url: Some(anvil.ws_endpoint()),
|
||||
..Default::default()
|
||||
},
|
||||
),
|
||||
]),
|
||||
private_rpcs: None,
|
||||
bundler_4337_rpcs: None,
|
||||
extra: Default::default(),
|
||||
};
|
||||
)]),
|
||||
private_rpcs: None,
|
||||
bundler_4337_rpcs: None,
|
||||
extra: Default::default(),
|
||||
};
|
||||
|
||||
let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1);
|
||||
let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1);
|
||||
|
||||
// spawn another thread for running the app
|
||||
// TODO: allow launching into the local tokio runtime instead of creating a new one?
|
||||
let app_handle = {
|
||||
let frontend_port = 0;
|
||||
let prometheus_port = 0;
|
||||
let shutdown_sender = shutdown_sender.clone();
|
||||
let frontend_port_arc = Arc::new(AtomicU16::new(0));
|
||||
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
|
||||
|
||||
tokio::spawn(run(
|
||||
top_config,
|
||||
None,
|
||||
frontend_port,
|
||||
prometheus_port,
|
||||
2,
|
||||
// spawn another thread for running the app
|
||||
// TODO: allow launching into the local tokio runtime instead of creating a new one?
|
||||
let handle = {
|
||||
tokio::spawn(run(
|
||||
top_config,
|
||||
None,
|
||||
frontend_port_arc.clone(),
|
||||
prometheus_port_arc,
|
||||
2,
|
||||
shutdown_sender.clone(),
|
||||
))
|
||||
};
|
||||
|
||||
let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed);
|
||||
let start = Instant::now();
|
||||
while frontend_port == 0 {
|
||||
if start.elapsed() > Duration::from_secs(1) {
|
||||
panic!("took too long to start!");
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
frontend_port = frontend_port_arc.load(Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port);
|
||||
|
||||
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
|
||||
|
||||
Self {
|
||||
handle: Mutex::new(Some(handle)),
|
||||
anvil_provider,
|
||||
proxy_provider,
|
||||
shutdown_sender,
|
||||
))
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: do something to the node. query latest block, mine another block, query again
|
||||
let proxy_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
|
||||
fn stop(&self) -> Result<usize, SendError<()>> {
|
||||
self.shutdown_sender.send(())
|
||||
}
|
||||
|
||||
async fn wait(&self) {
|
||||
// TODO: lock+take feels weird, but it works
|
||||
let handle = self.handle.lock().take();
|
||||
|
||||
if let Some(handle) = handle {
|
||||
info!("waiting for the app to stop...");
|
||||
handle.await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestApp {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn it_works() {
|
||||
// TODO: move basic setup into a test fixture
|
||||
let x = TestApp::spawn().await;
|
||||
|
||||
let anvil_provider = &x.anvil_provider;
|
||||
let proxy_provider = &x.proxy_provider;
|
||||
|
||||
let anvil_result = anvil_provider
|
||||
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
|
||||
@ -391,12 +440,7 @@ mod tests {
|
||||
|
||||
assert_eq!(first_block_num, second_block_num - 1);
|
||||
|
||||
// TODO: how do we make fixtures run this at the end?
|
||||
// tell the test app to shut down
|
||||
shutdown_sender.send(()).unwrap();
|
||||
|
||||
info!("waiting for shutdown...");
|
||||
// TODO: panic if a timeout is reached
|
||||
app_handle.await.unwrap().unwrap();
|
||||
// x.stop();
|
||||
// x.wait().await;
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ use crate::app::Web3ProxyJoinHandle;
|
||||
use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock};
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use argh::FromArgs;
|
||||
use ethers::prelude::{Address, TxHash, H256};
|
||||
use ethers::prelude::{Address, TxHash};
|
||||
use ethers::types::{U256, U64};
|
||||
use hashbrown::HashMap;
|
||||
use migration::sea_orm::DatabaseConnection;
|
||||
@ -98,9 +98,6 @@ pub struct AppConfig {
|
||||
/// Default ERC address for out deposit contract
|
||||
pub deposit_factory_contract: Option<Address>,
|
||||
|
||||
/// Default ERC address for out deposit contract
|
||||
pub deposit_topic: Option<H256>,
|
||||
|
||||
/// minimum amount to increase eth_estimateGas results
|
||||
pub gas_increase_min: Option<U256>,
|
||||
|
||||
@ -275,6 +272,7 @@ pub struct Web3RpcConfig {
|
||||
/// block data limit. If None, will be queried
|
||||
pub block_data_limit: Option<u64>,
|
||||
/// the requests per second at which the server starts slowing down
|
||||
#[serde(default = "default_soft_limit")]
|
||||
pub soft_limit: u32,
|
||||
/// the requests per second at which the server throws errors (rate limit or otherwise)
|
||||
pub hard_limit: Option<u64>,
|
||||
@ -290,6 +288,10 @@ pub struct Web3RpcConfig {
|
||||
pub extra: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
fn default_soft_limit() -> u32 {
|
||||
10
|
||||
}
|
||||
|
||||
impl Web3RpcConfig {
|
||||
/// Create a Web3Rpc from config
|
||||
/// TODO: move this into Web3Rpc? (just need to make things pub(crate))
|
||||
|
@ -363,7 +363,7 @@ pub async fn admin_imitate_login_post(
|
||||
|
||||
// mostly default options are fine. the message includes timestamp and domain and nonce
|
||||
let verify_config = VerificationOpts {
|
||||
rpc_provider: Some(app.internal_provider.clone()),
|
||||
rpc_provider: Some(app.internal_provider().clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -18,9 +18,9 @@ use axum::{
|
||||
use http::{header::AUTHORIZATION, StatusCode};
|
||||
use listenfd::ListenFd;
|
||||
use moka::future::{Cache, CacheBuilder};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::{iter::once, time::Duration};
|
||||
use std::{net::SocketAddr, sync::atomic::Ordering};
|
||||
use strum::{EnumCount, EnumIter};
|
||||
use tokio::sync::broadcast;
|
||||
use tower_http::cors::CorsLayer;
|
||||
@ -41,8 +41,7 @@ pub type ResponseCache = Cache<ResponseCacheKey, (StatusCode, &'static str, axum
|
||||
|
||||
/// Start the frontend server.
|
||||
pub async fn serve(
|
||||
port: u16,
|
||||
proxy_app: Arc<Web3ProxyApp>,
|
||||
app: Arc<Web3ProxyApp>,
|
||||
mut shutdown_receiver: broadcast::Receiver<()>,
|
||||
shutdown_complete_sender: broadcast::Sender<()>,
|
||||
) -> Web3ProxyResult<()> {
|
||||
@ -59,7 +58,7 @@ pub async fn serve(
|
||||
// TODO: read config for if fastest/versus should be available publicly. default off
|
||||
|
||||
// build our axum Router
|
||||
let app = Router::new()
|
||||
let router = Router::new()
|
||||
// TODO: i think these routes could be done a lot better
|
||||
//
|
||||
// HTTP RPC (POST)
|
||||
@ -237,7 +236,7 @@ pub async fn serve(
|
||||
// handle cors
|
||||
.layer(CorsLayer::very_permissive())
|
||||
// application state
|
||||
.layer(Extension(proxy_app))
|
||||
.layer(Extension(app.clone()))
|
||||
// frontend caches
|
||||
.layer(Extension(Arc::new(response_cache)))
|
||||
// 404 for any unknown routes
|
||||
@ -251,9 +250,8 @@ pub async fn serve(
|
||||
|
||||
axum::Server::from_tcp(listener)?
|
||||
} else {
|
||||
info!("listening on port {}", port);
|
||||
// TODO: allow only listening on localhost? top_config.app.host.parse()?
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::Relaxed)));
|
||||
|
||||
axum::Server::try_bind(&addr)?
|
||||
};
|
||||
@ -269,17 +267,23 @@ pub async fn serve(
|
||||
#[cfg(feature = "connectinfo")]
|
||||
let make_service = {
|
||||
info!("connectinfo feature enabled");
|
||||
app.into_make_service_with_connect_info::<SocketAddr>()
|
||||
router.into_make_service_with_connect_info::<SocketAddr>()
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "connectinfo"))]
|
||||
let make_service = {
|
||||
info!("connectinfo feature disabled");
|
||||
app.into_make_service()
|
||||
router.into_make_service()
|
||||
};
|
||||
|
||||
let server = server_builder
|
||||
.serve(make_service)
|
||||
let server = server_builder.serve(make_service);
|
||||
|
||||
let port = server.local_addr().port();
|
||||
info!("listening on port {}", port);
|
||||
|
||||
app.frontend_port.store(port, Ordering::Relaxed);
|
||||
|
||||
let server = server
|
||||
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
|
||||
.with_graceful_shutdown(async move {
|
||||
let _ = shutdown_receiver.recv().await;
|
||||
|
@ -38,9 +38,10 @@ use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock};
|
||||
use tracing::{info, trace};
|
||||
|
||||
/// How to select backend servers for a request
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
pub enum ProxyMode {
|
||||
/// send to the "best" synced server
|
||||
#[default]
|
||||
Best,
|
||||
/// send to all synced servers and return the fastest non-error response (reverts do not count as errors here)
|
||||
Fastest(usize),
|
||||
@ -51,12 +52,6 @@ pub enum ProxyMode {
|
||||
Debug,
|
||||
}
|
||||
|
||||
impl Default for ProxyMode {
|
||||
fn default() -> Self {
|
||||
Self::Best
|
||||
}
|
||||
}
|
||||
|
||||
/// Public entrypoint for WebSocket JSON-RPC requests.
|
||||
/// Queries a single server at a time
|
||||
#[debug_handler]
|
||||
|
@ -257,7 +257,7 @@ pub async fn user_login_post(
|
||||
|
||||
// mostly default options are fine. the message includes timestamp and domain and nonce
|
||||
let verify_config = VerificationOpts {
|
||||
rpc_provider: Some(app.internal_provider.clone()),
|
||||
rpc_provider: Some(app.internal_provider().clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -3,6 +3,7 @@ use axum::http::HeaderValue;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{routing::get, Extension, Router};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::info;
|
||||
@ -13,21 +14,29 @@ use crate::errors::Web3ProxyResult;
|
||||
/// Run a prometheus metrics server on the given port.
|
||||
pub async fn serve(
|
||||
app: Arc<Web3ProxyApp>,
|
||||
port: u16,
|
||||
mut shutdown_receiver: broadcast::Receiver<()>,
|
||||
) -> Web3ProxyResult<()> {
|
||||
// routes should be ordered most to least common
|
||||
let app = Router::new().route("/", get(root)).layer(Extension(app));
|
||||
let router = Router::new()
|
||||
.route("/", get(root))
|
||||
.layer(Extension(app.clone()));
|
||||
|
||||
// note: the port here might be 0
|
||||
let port = app.prometheus_port.load(Ordering::Relaxed);
|
||||
// TODO: config for the host?
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||
info!("prometheus listening on port {}", port);
|
||||
|
||||
let service = app.into_make_service();
|
||||
let service = router.into_make_service();
|
||||
|
||||
// `axum::Server` is a re-export of `hyper::Server`
|
||||
axum::Server::bind(&addr)
|
||||
.serve(service)
|
||||
let server = axum::Server::bind(&addr).serve(service);
|
||||
|
||||
let port = server.local_addr().port();
|
||||
info!("prometheus listening on port {}", port);
|
||||
|
||||
app.prometheus_port.store(port, Ordering::Relaxed);
|
||||
|
||||
server
|
||||
.with_graceful_shutdown(async move {
|
||||
let _ = shutdown_receiver.recv().await;
|
||||
})
|
||||
|
@ -1376,8 +1376,6 @@ impl Serialize for Web3Rpcs {
|
||||
mod tests {
|
||||
#![allow(unused_imports)]
|
||||
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use super::*;
|
||||
use crate::rpcs::blockchain::Web3ProxyBlock;
|
||||
use crate::rpcs::consensus::ConsensusFinder;
|
||||
@ -1387,6 +1385,7 @@ mod tests {
|
||||
use latency::PeakEwmaLatency;
|
||||
use moka::future::CacheBuilder;
|
||||
use parking_lot::RwLock;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tracing::trace;
|
||||
|
||||
#[cfg(test)]
|
||||
@ -1394,6 +1393,7 @@ mod tests {
|
||||
PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1))
|
||||
}
|
||||
|
||||
// TODO: logging
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_sort_connections_by_sync_status() {
|
||||
// TODO: how should we do test logging setup with tracing?
|
||||
|
Loading…
Reference in New Issue
Block a user