unlimited localhost

This commit is contained in:
Bryan Stitt 2023-05-30 22:31:35 -07:00
parent fcc1843af0
commit f1636d3b85
5 changed files with 49 additions and 112 deletions

@ -19,6 +19,7 @@ use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::{connect_http, EthersHttpProvider};
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, StatBuffer};
use crate::user_token::UserBearerToken;
@ -140,6 +141,7 @@ pub struct Web3ProxyApp {
/// Optional read-only database for users and accounting
pub db_replica: Option<DatabaseReplica>,
pub hostname: Option<String>,
pub internal_provider: Arc<EthersHttpProvider>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
pub pending_transactions: Arc<CacheWithTTL<TxHash, TxStatus>>,
@ -212,6 +214,7 @@ pub struct Web3ProxyAppSpawn {
impl Web3ProxyApp {
/// The main entrypoint.
pub async fn spawn(
app_frontend_port: u16,
top_config: TopConfig,
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
@ -604,30 +607,45 @@ impl Web3ProxyApp {
.ok()
.and_then(|x| x.to_str().map(|x| x.to_string()));
// TODO: i'm sure theres much better ways to do this, but i don't want to spend time fighting traits right now
// TODO: what interval? i don't think we use it
// i tried and failed to `impl JsonRpcClient for Web3ProxyApi`
// i tried and failed to set up ipc. http is already running, so lets just use that
let internal_provider = connect_http(
format!("http://127.0.0.1:{}", app_frontend_port)
.parse()
.unwrap(),
http_client.clone(),
Duration::from_secs(10),
)?;
let internal_provider = Arc::new(internal_provider);
let app = Self {
config: top_config.app.clone(),
balanced_rpcs,
bearer_token_semaphores,
bundler_4337_rpcs,
http_client,
kafka_producer,
private_rpcs,
jsonrpc_response_cache: response_cache,
watch_consensus_head_receiver,
pending_tx_sender,
pending_transactions,
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,
login_rate_limiter,
config: top_config.app.clone(),
db_conn,
db_replica,
influxdb_client,
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,
hostname,
vredis_pool,
rpc_secret_key_cache,
bearer_token_semaphores,
http_client,
influxdb_client,
internal_provider,
ip_semaphores,
user_semaphores,
jsonrpc_response_cache: response_cache,
kafka_producer,
login_rate_limiter,
pending_transactions,
pending_tx_sender,
private_rpcs,
rpc_secret_key_cache,
stat_sender,
user_semaphores,
vredis_pool,
watch_consensus_head_receiver,
};
let app = Arc::new(app);

@ -75,59 +75,17 @@ async fn run(
broadcast::channel(1);
// start the main app
// let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?;
let mut spawned_app =
Web3ProxyApp::spawn(top_config.clone(), num_workers, app_shutdown_sender.clone()).await?;
let mut spawned_app = Web3ProxyApp::spawn(
app_frontend_port,
top_config.clone(),
num_workers,
app_shutdown_sender.clone(),
)
.await?;
// start thread for watching config
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config_sender;
/*
#[cfg(feature = "inotify")]
{
let mut inotify = Inotify::init().expect("Failed to initialize inotify");
inotify
.add_watch(top_config_path.clone(), WatchMask::MODIFY)
.expect("Failed to add inotify watch on config");
let mut buffer = [0u8; 4096];
// TODO: exit the app if this handle exits
thread::spawn(move || loop {
// TODO: debounce
let events = inotify
.read_events_blocking(&mut buffer)
.expect("Failed to read inotify events");
for event in events {
if event.mask.contains(EventMask::MODIFY) {
info!("config changed");
match fs::read_to_string(&top_config_path) {
Ok(top_config) => match toml::from_str(&top_config) {
Ok(top_config) => {
config_sender.send(top_config).unwrap();
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
};
} else {
// TODO: is "MODIFY" enough, or do we want CLOSE_WRITE?
unimplemented!();
}
}
});
}
*/
// #[cfg(not(feature = "inotify"))]
{
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
@ -154,35 +112,6 @@ async fn run(
}
}
// start thread for watching config
// if let Some(top_config_path) = top_config_path {
// let config_sender = spawned_app.new_top_config_sender;
// {
// thread::spawn(move || loop {
// match fs::read_to_string(&top_config_path) {
// Ok(new_top_config) => match toml::from_str(&new_top_config) {
// Ok(new_top_config) => {
// if new_top_config != top_config {
// top_config = new_top_config;
// config_sender.send(top_config.clone()).unwrap();
// }
// }
// Err(err) => {
// // TODO: panic?
// error!("Unable to parse config! {:#?}", err);
// }
// },
// Err(err) => {
// // TODO: panic?
// error!("Unable to read config! {:#?}", err);
// }
// }
//
// thread::sleep(Duration::from_secs(10));
// });
// }
// }
// start the prometheus metrics port
let prometheus_handle = tokio::spawn(prometheus::serve(
spawned_app.app.clone(),

@ -8,7 +8,7 @@ use crate::rpcs::one::Web3Rpc;
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::authorization::Bearer;
use axum::headers::authorization::{self, Bearer};
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use core::fmt;
@ -1049,6 +1049,13 @@ impl Web3ProxyApp {
origin: Option<Origin>,
proxy_mode: ProxyMode,
) -> Web3ProxyResult<RateLimitResult> {
if ip.is_loopback() {
// TODO: localhost being unlimited should be optional
let authorization = Authorization::internal(self.db_conn())?;
return Ok(RateLimitResult::Allowed(authorization, None));
}
// ip rate limits don't check referer or user agent
// they do check origin because we can override rate limits for some origins
let authorization = Authorization::external(

@ -1,16 +0,0 @@
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyResult;
/// Start an ipc server that has no rate limits
pub async fn serve(
socket_path: PathBuf,
proxy_app: Arc<Web3ProxyApp>,
mut shutdown_receiver: broadcast::Receiver<()>,
shutdown_complete_sender: broadcast::Sender<()>,
) -> Web3ProxyResult<()> {
todo!();
}

@ -7,7 +7,6 @@ pub mod config;
pub mod errors;
pub mod frontend;
pub mod http_params;
pub mod ipc;
pub mod jsonrpc;
pub mod pagerduty;
pub mod prometheus;