diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 36be936c..b76f06fe 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -19,6 +19,7 @@ use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; +use crate::rpcs::provider::{connect_http, EthersHttpProvider}; use crate::rpcs::transactions::TxStatus; use crate::stats::{AppStat, StatBuffer}; use crate::user_token::UserBearerToken; @@ -140,6 +141,7 @@ pub struct Web3ProxyApp { /// Optional read-only database for users and accounting pub db_replica: Option, pub hostname: Option, + pub internal_provider: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries pub pending_transactions: Arc>, @@ -212,6 +214,7 @@ pub struct Web3ProxyAppSpawn { impl Web3ProxyApp { /// The main entrypoint. pub async fn spawn( + app_frontend_port: u16, top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, @@ -604,30 +607,45 @@ impl Web3ProxyApp { .ok() .and_then(|x| x.to_str().map(|x| x.to_string())); + // TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now + // TODO: what interval? i don't think we use it + // i tried and failed to `impl JsonRpcClient for Web3ProxyApi` + // i tried and failed to set up ipc. http is already running, so lets just use that + let internal_provider = connect_http( + format!("http://127.0.0.1:{}", app_frontend_port) + .parse() + .unwrap(), + http_client.clone(), + Duration::from_secs(10), + )?; + + let internal_provider = Arc::new(internal_provider); + let app = Self { - config: top_config.app.clone(), balanced_rpcs, + bearer_token_semaphores, bundler_4337_rpcs, - http_client, - kafka_producer, - private_rpcs, - jsonrpc_response_cache: response_cache, - watch_consensus_head_receiver, - pending_tx_sender, - pending_transactions, - frontend_ip_rate_limiter, - frontend_registered_user_rate_limiter, - login_rate_limiter, + config: top_config.app.clone(), db_conn, db_replica, - influxdb_client, + frontend_ip_rate_limiter, + frontend_registered_user_rate_limiter, hostname, - vredis_pool, - rpc_secret_key_cache, - bearer_token_semaphores, + http_client, + influxdb_client, + internal_provider, ip_semaphores, - user_semaphores, + jsonrpc_response_cache: response_cache, + kafka_producer, + login_rate_limiter, + pending_transactions, + pending_tx_sender, + private_rpcs, + rpc_secret_key_cache, stat_sender, + user_semaphores, + vredis_pool, + watch_consensus_head_receiver, }; let app = Arc::new(app); diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 37edcafb..a8ca67a0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -75,59 +75,17 @@ async fn run( broadcast::channel(1); // start the main app - // let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?; - let mut spawned_app = - Web3ProxyApp::spawn(top_config.clone(), num_workers, app_shutdown_sender.clone()).await?; + let mut spawned_app = Web3ProxyApp::spawn( + app_frontend_port, + top_config.clone(), + num_workers, + app_shutdown_sender.clone(), + ) + .await?; // start thread for watching config if let Some(top_config_path) = top_config_path { let config_sender = spawned_app.new_top_config_sender; - /* - #[cfg(feature = "inotify")] - { - let mut inotify = Inotify::init().expect("Failed to initialize inotify"); - - inotify - .add_watch(top_config_path.clone(), WatchMask::MODIFY) - .expect("Failed to add inotify watch on config"); - - let mut buffer = [0u8; 4096]; - - // TODO: exit the app if this handle exits - thread::spawn(move || loop { - // TODO: debounce - - let events = inotify - .read_events_blocking(&mut buffer) - .expect("Failed to read inotify events"); - - for event in events { - if event.mask.contains(EventMask::MODIFY) { - info!("config changed"); - match fs::read_to_string(&top_config_path) { - Ok(top_config) => match toml::from_str(&top_config) { - Ok(top_config) => { - config_sender.send(top_config).unwrap(); - } - Err(err) => { - // TODO: panic? - error!("Unable to parse config! {:#?}", err); - } - }, - Err(err) => { - // TODO: panic? - error!("Unable to read config! {:#?}", err); - } - }; - } else { - // TODO: is "MODIFY" enough, or do we want CLOSE_WRITE? - unimplemented!(); - } - } - }); - } - */ - // #[cfg(not(feature = "inotify"))] { thread::spawn(move || loop { match fs::read_to_string(&top_config_path) { @@ -154,35 +112,6 @@ async fn run( } } - // start thread for watching config - // if let Some(top_config_path) = top_config_path { - // let config_sender = spawned_app.new_top_config_sender; - // { - // thread::spawn(move || loop { - // match fs::read_to_string(&top_config_path) { - // Ok(new_top_config) => match toml::from_str(&new_top_config) { - // Ok(new_top_config) => { - // if new_top_config != top_config { - // top_config = new_top_config; - // config_sender.send(top_config.clone()).unwrap(); - // } - // } - // Err(err) => { - // // TODO: panic? - // error!("Unable to parse config! {:#?}", err); - // } - // }, - // Err(err) => { - // // TODO: panic? - // error!("Unable to read config! {:#?}", err); - // } - // } - // - // thread::sleep(Duration::from_secs(10)); - // }); - // } - // } - // start the prometheus metrics port let prometheus_handle = tokio::spawn(prometheus::serve( spawned_app.app.clone(), diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 0a5a0ea4..cf5da22e 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -8,7 +8,7 @@ use crate::rpcs::one::Web3Rpc; use crate::stats::{AppStat, BackendRequests, RpcQueryStats}; use crate::user_token::UserBearerToken; use anyhow::Context; -use axum::headers::authorization::Bearer; +use axum::headers::authorization::{self, Bearer}; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; use core::fmt; @@ -1049,6 +1049,13 @@ impl Web3ProxyApp { origin: Option, proxy_mode: ProxyMode, ) -> Web3ProxyResult { + if ip.is_loopback() { + // TODO: localhost being unlimited should be optional + let authorization = Authorization::internal(self.db_conn())?; + + return Ok(RateLimitResult::Allowed(authorization, None)); + } + // ip rate limits don't check referer or user agent // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( diff --git a/web3_proxy/src/ipc.rs b/web3_proxy/src/ipc.rs deleted file mode 100644 index c898d77c..00000000 --- a/web3_proxy/src/ipc.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::broadcast; - -use crate::app::Web3ProxyApp; -use crate::errors::Web3ProxyResult; - -/// Start an ipc server that has no rate limits -pub async fn serve( - socket_path: PathBuf, - proxy_app: Arc, - mut shutdown_receiver: broadcast::Receiver<()>, - shutdown_complete_sender: broadcast::Sender<()>, -) -> Web3ProxyResult<()> { - todo!(); -} diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 13ec65f2..e345c7ed 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -7,7 +7,6 @@ pub mod config; pub mod errors; pub mod frontend; pub mod http_params; -pub mod ipc; pub mod jsonrpc; pub mod pagerduty; pub mod prometheus;