diff --git a/config/example.toml b/config/example.toml index 2958ddf7..f7770934 100644 --- a/config/example.toml +++ b/config/example.toml @@ -17,6 +17,7 @@ redirect_user_url = "https://llamanodes.com/dashboard/keys?key={rpc_key_id}" sentry_url = "https://SENTRY_KEY_A.ingest.sentry.io/SENTRY_KEY_B" +# public limits are when no key is used. these are instead grouped by ip # 0 = block all public requests public_max_concurrent_requests = 3 # 0 = block all public requests diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index 6f9341f4..b8c83e0c 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -22,7 +22,9 @@ pub struct RedisRateLimiter { } pub enum RedisRateLimitResult { + /// TODO: what is the inner value? Allowed(u64), + /// TODO: what is the inner value? RetryAt(Instant, u64), RetryNever, } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 6c6511f1..70dc3d0c 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -3,7 +3,7 @@ use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use crate::block_number::block_needed; use crate::config::{AppConfig, TopConfig}; -use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata}; +use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; @@ -46,7 +46,7 @@ use tracing::{error, info, instrument, trace, warn}; use ulid::Ulid; // TODO: make this customizable? -static APP_USER_AGENT: &str = concat!( +pub static APP_USER_AGENT: &str = concat!( "satoshiandkin/", env!("CARGO_PKG_NAME"), "/", @@ -62,10 +62,12 @@ type ResponseCache = pub type AnyhowJoinHandle = JoinHandle>; #[derive(Clone, Debug, Default, From)] -pub struct UserKeyData { - /// database id of the primary user +pub struct AuthorizationChecks { + /// database id of the primary user. + /// TODO: do we need this? its on the authorization so probably not pub user_id: u64, /// database id of the rpc key + /// if this is 0, then this request is being rate limited by ip pub rpc_key_id: u64, /// if None, allow unlimited queries. inherited from the user_tier pub max_requests_per_period: Option, @@ -109,7 +111,8 @@ pub struct Web3ProxyApp { pub login_rate_limiter: Option, pub vredis_pool: Option, // TODO: this key should be our RpcSecretKey class, not Ulid - pub rpc_secret_key_cache: Cache, + pub rpc_secret_key_cache: + Cache, pub rpc_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub bearer_token_semaphores: @@ -193,7 +196,7 @@ impl Web3ProxyApp { shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result { // safety checks on the config - if let Some(redirect) = &top_config.app.redirect_user_url { + if let Some(redirect) = &top_config.app.redirect_rpc_key_url { assert!( redirect.contains("{rpc_key_id}"), "redirect_user_url user url must contain \"{{rpc_key_id}}\"" @@ -330,6 +333,7 @@ impl Web3ProxyApp { // connect to the load balanced rpcs let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, + db_conn.clone(), balanced_rpcs, http_client.clone(), vredis_pool.clone(), @@ -356,6 +360,7 @@ impl Web3ProxyApp { } else { let (private_rpcs, private_handle) = Web3Connections::spawn( top_config.app.chain_id, + db_conn.clone(), private_rpcs, http_client.clone(), vredis_pool.clone(), @@ -522,7 +527,7 @@ impl Web3ProxyApp { #[instrument(level = "trace")] pub async fn eth_subscribe<'a>( self: &'a Arc, - authorized_request: Arc, + authorization: Arc, payload: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now @@ -719,7 +724,7 @@ impl Web3ProxyApp { #[instrument(level = "trace")] pub async fn proxy_web3_rpc( self: &Arc, - authorized_request: Arc, + authorization: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -734,14 +739,14 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( timeout( max_time, - self.proxy_web3_rpc_request(authorized_request, request), + self.proxy_web3_rpc_request(&authorization, request), ) .await??, ), JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( timeout( max_time, - self.proxy_web3_rpc_requests(authorized_request, requests), + self.proxy_web3_rpc_requests(&authorization, requests), ) .await??, ), @@ -758,27 +763,24 @@ impl Web3ProxyApp { #[instrument(level = "trace")] async fn proxy_web3_rpc_requests( self: &Arc, - authorized_request: Arc, + authorization: &Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly let num_requests = requests.len(); + // TODO: spawn so the requests go in parallel + // TODO: i think we will need to flatten let responses = join_all( requests .into_iter() - .map(|request| { - let authorized_request = authorized_request.clone(); - - // TODO: spawn so the requests go in parallel - // TODO: i think we will need to flatten - self.proxy_web3_rpc_request(authorized_request, request) - }) + .map(|request| self.proxy_web3_rpc_request(authorization, request)) .collect::>(), ) .await; - // TODO: i'm sure this could be done better with iterators + // TODO: i'm sure this could be done better with iterators. we could return the error earlier then, too + // TODO: stream the response? let mut collected: Vec = Vec::with_capacity(num_requests); for response in responses { collected.push(response?); @@ -809,7 +811,7 @@ impl Web3ProxyApp { #[instrument(level = "trace")] async fn proxy_web3_rpc_request( self: &Arc, - authorized_request: Arc, + authorization: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -818,7 +820,7 @@ impl Web3ProxyApp { let request_metadata = Arc::new(RequestMetadata::new(60, &request)?); // save the id so we can attach it to the response - // TODO: instead of cloning, take the id out + // TODO: instead of cloning, take the id out? let request_id = request.id.clone(); // TODO: if eth_chainId or net_version, serve those without querying the backend @@ -947,7 +949,7 @@ impl Web3ProxyApp { return rpcs .try_send_all_upstream_servers( - Some(&authorized_request), + authorization, request, Some(request_metadata), None, @@ -1013,6 +1015,7 @@ impl Web3ProxyApp { // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different let request_block_id = if let Some(request_block_needed) = block_needed( + authorization, method, request.params.as_mut(), head_block_id.num, @@ -1021,8 +1024,10 @@ impl Web3ProxyApp { .await? { // TODO: maybe this should be on the app and not on balanced_rpcs - let (request_block_hash, archive_needed) = - self.balanced_rpcs.block_hash(&request_block_needed).await?; + let (request_block_hash, archive_needed) = self + .balanced_rpcs + .block_hash(authorization, &request_block_needed) + .await?; if archive_needed { request_metadata @@ -1049,7 +1054,7 @@ impl Web3ProxyApp { let mut response = { let request_metadata = request_metadata.clone(); - let authorized_request = authorized_request.clone(); + let authorization = authorization.clone(); self.response_cache .try_get_with(cache_key, async move { @@ -1059,7 +1064,7 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_send_best_upstream_server( - Some(&authorized_request), + &authorization, request, Some(&request_metadata), Some(&request_block_id.num), @@ -1085,14 +1090,10 @@ impl Web3ProxyApp { // replace the id with our request's id. response.id = request_id; - // DRY this up by just returning the partial result (or error) here - if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( - self.stat_sender.as_ref(), - Arc::try_unwrap(authorized_request), - ) { + if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( method.to_string(), - authorized_key, + authorization.clone(), request_metadata, &response, ); @@ -1109,12 +1110,13 @@ impl Web3ProxyApp { let response = JsonRpcForwardedResponse::from_value(partial_response, request_id); - if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( - self.stat_sender.as_ref(), - Arc::try_unwrap(authorized_request), - ) { - let response_stat = - ProxyResponseStat::new(request.method, authorized_key, request_metadata, &response); + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + request.method, + authorization.clone(), + request_metadata, + &response, + ); stat_sender .send_async(response_stat.into()) diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index a3b0d3de..cd8f77f7 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -1,4 +1,4 @@ -use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; +use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; use chrono::{TimeZone, Utc}; use derive_more::From; @@ -18,17 +18,30 @@ use tracing::{error, info}; /// TODO: can we use something inside sea_orm instead? #[derive(Debug)] pub struct ProxyResponseStat { - rpc_key_id: u64, + authorization: Arc, method: String, archive_request: bool, + error_response: bool, request_bytes: u64, /// if backend_requests is 0, there was a cache_hit backend_requests: u64, - error_response: bool, response_bytes: u64, response_millis: u64, } +impl ProxyResponseStat { + /// TODO: think more about this. probably rename it + fn key(&self) -> ProxyResponseAggregateKey { + ProxyResponseAggregateKey { + rpc_key_id: self.authorization.checks.rpc_key_id, + // TODO: include Origin here? + method: self.method.clone(), + archive_request: self.archive_request, + error_response: self.error_response, + } + } +} + pub struct ProxyResponseHistograms { request_bytes: Histogram, response_bytes: Histogram, @@ -50,6 +63,7 @@ impl Default for ProxyResponseHistograms { } } +// TODO: think more about if we should include IP address in this #[derive(Clone, From, Hash, PartialEq, Eq)] struct ProxyResponseAggregateKey { rpc_key_id: u64, @@ -62,9 +76,9 @@ struct ProxyResponseAggregateKey { pub struct ProxyResponseAggregate { frontend_requests: u64, backend_requests: u64, - // TODO: related to backend_requests. get this level of detail out + // TODO: related to backend_requests // backend_retries: u64, - // TODO: related to backend_requests. get this level of detail out + // TODO: related to backend_requests // no_servers: u64, cache_misses: u64, cache_hits: u64, @@ -164,9 +178,10 @@ impl ProxyResponseAggregate { let p99_response_bytes = response_bytes.value_at_quantile(0.99); let max_response_bytes = response_bytes.max(); + // TODO: Set origin and maybe other things on this model. probably not the ip though let aggregated_stat_model = rpc_accounting::ActiveModel { id: sea_orm::NotSet, - + // origin: sea_orm::Set(key.authorization.origin.to_string()), rpc_key_id: sea_orm::Set(key.rpc_key_id), chain_id: sea_orm::Set(chain_id), method: sea_orm::Set(key.method), @@ -215,7 +230,7 @@ impl ProxyResponseStat { // TODO: should RequestMetadata be in an arc? or can we handle refs here? pub fn new( method: String, - authorized_key: AuthorizedKey, + authorization: Arc, metadata: Arc, response: &JsonRpcForwardedResponse, ) -> Self { @@ -236,7 +251,7 @@ impl ProxyResponseStat { let response_millis = metadata.start_instant.elapsed().as_millis() as u64; Self { - rpc_key_id: authorized_key.rpc_key_id, + authorization, archive_request, method, backend_requests, @@ -246,15 +261,6 @@ impl ProxyResponseStat { response_millis, } } - - fn key(&self) -> ProxyResponseAggregateKey { - ProxyResponseAggregateKey { - rpc_key_id: self.rpc_key_id, - method: self.method.clone(), - error_response: self.error_response, - archive_request: self.archive_request, - } - } } impl StatEmitter { diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 8a56865b..625f1e98 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -283,7 +283,7 @@ mod tests { public_requests_per_period: Some(1_000_000), response_cache_max_bytes: 10_usize.pow(7), redirect_public_url: Some("example.com/".to_string()), - redirect_user_url: Some("example.com/{{rpc_key_id}}".to_string()), + redirect_rpc_key_url: Some("example.com/{rpc_key_id}".to_string()), ..Default::default() }, balanced_rpcs: HashMap::from([ diff --git a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs index ae781eb5..a05e055f 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs @@ -66,7 +66,7 @@ impl CheckConfigSubCommand { } // TODO: also check that it contains rpc_key_id! - match top_config.app.redirect_user_url { + match top_config.app.redirect_rpc_key_url { None => { warn!("app.redirect_user_url is None. Registered users will get an error page instead of a redirect") } diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 6d361040..be854c79 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -4,9 +4,10 @@ use ethers::{ prelude::{BlockNumber, U64}, types::H256, }; +use std::sync::Arc; use tracing::{instrument, warn}; -use crate::rpcs::connections::Web3Connections; +use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 { match block_num { @@ -40,6 +41,7 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 { /// modify params to always have a block number and not "latest" #[instrument(level = "trace")] pub async fn clean_block_number( + authorization: &Arc, params: &mut serde_json::Value, block_param_id: usize, latest_block: U64, @@ -70,7 +72,7 @@ pub async fn clean_block_number( let block_hash: H256 = serde_json::from_value(block_hash).context("decoding blockHash")?; - let block = rpcs.block(None, &block_hash, None).await?; + let block = rpcs.block(authorization, &block_hash, None).await?; block .number @@ -98,6 +100,7 @@ pub async fn clean_block_number( // TODO: change this to also return the hash needed? #[instrument(level = "trace")] pub async fn block_needed( + authorization: &Arc, method: &str, params: Option<&mut serde_json::Value>, head_block_num: U64, @@ -203,7 +206,7 @@ pub async fn block_needed( } }; - match clean_block_number(params, block_param_id, head_block_num, rpcs).await { + match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await { Ok(block) => Ok(Some(block)), Err(err) => { // TODO: seems unlikely that we will get here diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index b4a8c4d1..914e9b71 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,6 +6,7 @@ use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::TxHash; use hashbrown::HashMap; +use sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; @@ -118,7 +119,7 @@ pub struct AppConfig { pub redirect_public_url: Option, /// the stats page url for a logged in user. if set, must contain "{rpc_key_id}" - pub redirect_user_url: Option, + pub redirect_rpc_key_url: Option, /// Optionally send errors to pub sentry_url: Option, @@ -199,6 +200,7 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, + db_conn: Option, redis_pool: Option, chain_id: u64, http_client: Option, @@ -228,6 +230,7 @@ impl Web3ConnectionConfig { Web3Connection::spawn( name, chain_id, + db_conn, self.url, http_client, http_interval_sender, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 619ef4be..42b45973 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,16 +1,16 @@ //! Utilities for authorization of logged in and anonymous users. use super::errors::FrontendErrorResponse; -use crate::app::{UserKeyData, Web3ProxyApp}; +use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::jsonrpc::JsonRpcRequest; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; -use axum::TypedHeader; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; use entities::{rpc_key, user, user_tier}; +use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; use redis_rate_limiter::redis::AsyncCommands; @@ -33,29 +33,28 @@ pub enum RpcSecretKey { Uuid(Uuid), } +/// TODO: should this have IpAddr and Origin or AuthorizationChecks? #[derive(Debug)] pub enum RateLimitResult { - /// contains the IP of the anonymous user - /// TODO: option inside or outside the arc? - AllowedIp(IpAddr, Option), - /// contains the rpc_key_id of an authenticated user - AllowedUser(UserKeyData, Option), - /// contains the IP and retry_at of the anonymous user - RateLimitedIp(IpAddr, Option), - /// contains the rpc_key_id and retry_at of an authenticated user key - RateLimitedUser(UserKeyData, Option), + Allowed(Authorization, Option), + RateLimited( + Authorization, + /// when their rate limit resets and they can try more requests + Option, + ), /// This key is not in our database. Deny access! UnknownKey, } +/// TODO: include the authorization checks in this? #[derive(Clone, Debug)] -pub struct AuthorizedKey { +pub struct Authorization { + pub checks: AuthorizationChecks, + pub db_conn: Option, pub ip: IpAddr, pub origin: Option, - pub user_id: u64, - pub rpc_key_id: u64, - // TODO: just use an f32? even an f16 is probably fine - pub log_revert_chance: f64, + pub referer: Option, + pub user_agent: Option, } #[derive(Debug)] @@ -65,6 +64,7 @@ pub struct RequestMetadata { // TODO: better name for this pub period_seconds: u64, pub request_bytes: u64, + // TODO: do we need atomics? seems like we should be able to pass a &mut around // TODO: "archive" isn't really a boolean. pub archive_request: AtomicBool, /// if this is 0, there was a cache_hit @@ -75,16 +75,6 @@ pub struct RequestMetadata { pub response_millis: AtomicU64, } -#[derive(Clone, Debug)] -pub enum AuthorizedRequest { - /// Request from this app - Internal, - /// Request from an anonymous IP address - Ip(IpAddr, Option), - /// Request from an authenticated and authorized user - User(Option, AuthorizedKey), -} - impl RequestMetadata { pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result { // TODO: how can we do this without turning it into a string first. this is going to slow us down! @@ -176,16 +166,65 @@ impl From for Uuid { } } -impl AuthorizedKey { - pub fn try_new( +impl Authorization { + pub fn local(db_conn: Option) -> anyhow::Result { + let authorization_checks = AuthorizationChecks { + // any error logs on a local (internal) query are likely problems. log them all + log_revert_chance: 1.0, + // default for everything else should be fine. we don't have a user_id or ip to give + ..Default::default() + }; + + let ip: IpAddr = "127.0.0.1".parse().expect("localhost should always parse"); + let user_agent = UserAgent::from_str(APP_USER_AGENT).ok(); + + Self::try_new(authorization_checks, db_conn, ip, None, None, user_agent) + } + + pub fn public( + allowed_origin_requests_per_period: &HashMap, + db_conn: Option, + ip: IpAddr, + origin: Option, + referer: Option, + user_agent: Option, + ) -> anyhow::Result { + // some origins can override max_requests_per_period for anon users + let max_requests_per_period = origin + .as_ref() + .map(|origin| { + allowed_origin_requests_per_period + .get(&origin.to_string()) + .cloned() + }) + .unwrap_or_default(); + + // TODO: default or None? + let authorization_checks = AuthorizationChecks { + max_requests_per_period, + ..Default::default() + }; + + Self::try_new( + authorization_checks, + db_conn, + ip, + origin, + referer, + user_agent, + ) + } + + pub fn try_new( + authorization_checks: AuthorizationChecks, + db_conn: Option, ip: IpAddr, origin: Option, referer: Option, user_agent: Option, - rpc_key_data: UserKeyData, ) -> anyhow::Result { // check ip - match &rpc_key_data.allowed_ips { + match &authorization_checks.allowed_ips { None => {} Some(allowed_ips) => { if !allowed_ips.iter().any(|x| x.contains(&ip)) { @@ -195,7 +234,7 @@ impl AuthorizedKey { } // check origin - match (&origin, &rpc_key_data.allowed_origins) { + match (&origin, &authorization_checks.allowed_origins) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("Origin required")), @@ -207,97 +246,82 @@ impl AuthorizedKey { } // check referer - match (referer, &rpc_key_data.allowed_referers) { + match (&referer, &authorization_checks.allowed_referers) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("Referer required")), (Some(referer), Some(allowed_referers)) => { - if !allowed_referers.contains(&referer) { + if !allowed_referers.contains(referer) { return Err(anyhow::anyhow!("Referer is not allowed!")); } } } // check user_agent - match (user_agent, &rpc_key_data.allowed_user_agents) { + match (&user_agent, &authorization_checks.allowed_user_agents) { (None, None) => {} (Some(_), None) => {} (None, Some(_)) => return Err(anyhow::anyhow!("User agent required")), (Some(user_agent), Some(allowed_user_agents)) => { - if !allowed_user_agents.contains(&user_agent) { + if !allowed_user_agents.contains(user_agent) { return Err(anyhow::anyhow!("User agent is not allowed!")); } } } Ok(Self { + checks: authorization_checks, + db_conn, ip, origin, - user_id: rpc_key_data.user_id, - rpc_key_id: rpc_key_data.rpc_key_id, - log_revert_chance: rpc_key_data.log_revert_chance, + referer, + user_agent, }) } } -impl AuthorizedRequest { - /// Only User has a database connection in case it needs to save a revert to the database. - pub fn db_conn(&self) -> Option<&DatabaseConnection> { - match self { - Self::User(x, _) => x.as_ref(), - _ => None, - } - } -} - -impl Display for &AuthorizedRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AuthorizedRequest::Internal => f.write_str("int"), - AuthorizedRequest::Ip(x, _) => f.write_str(&format!("ip-{}", x)), - AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.rpc_key_id)), - } - } -} - +/// rate limit logins only by ip. +/// we want all origins and referers and user agents to count together pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, -) -> Result { - let (ip, _semaphore) = match app.rate_limit_login(ip).await? { - RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore), - RateLimitResult::RateLimitedIp(x, retry_at) => { - return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); +) -> Result { + let authorization = match app.rate_limit_login(ip).await? { + RateLimitResult::Allowed(authorization, None) => authorization, + RateLimitResult::RateLimited(authorization, retry_at) => { + return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), }; - Ok(AuthorizedRequest::Ip(ip, None)) + Ok(authorization) } +/// semaphore won't ever be None, but its easier if key auth and ip auth work the same way pub async fn ip_is_authorized( app: &Web3ProxyApp, ip: IpAddr, - origin: Option>, -) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { - let origin = origin.map(|x| x.0); - + origin: Option, +) -> Result<(Authorization, Option), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator - let (ip, semaphore) = match app.rate_limit_by_ip(ip, origin.as_ref()).await? { - RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore), - RateLimitResult::RateLimitedIp(x, retry_at) => { - return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at)); + let (authorization, semaphore) = match app + .rate_limit_by_ip(&app.config.allowed_origin_requests_per_period, ip, origin) + .await? + { + RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), + RateLimitResult::RateLimited(authorization, retry_at) => { + return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - // semaphore won't ever be None, but its easier if key auth and ip auth work the same way - Ok((AuthorizedRequest::Ip(ip, origin), semaphore)) + Ok((authorization, semaphore)) } +/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse; pub async fn key_is_authorized( app: &Web3ProxyApp, rpc_key: RpcSecretKey, @@ -305,23 +329,21 @@ pub async fn key_is_authorized( origin: Option, referer: Option, user_agent: Option, -) -> Result<(AuthorizedRequest, Option), FrontendErrorResponse> { +) -> Result<(Authorization, Option), FrontendErrorResponse> { // check the rate limits. error if over the limit - let (user_data, semaphore) = match app.rate_limit_by_key(rpc_key).await? { - RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore), - RateLimitResult::RateLimitedUser(x, retry_at) => { - return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at)); + // TODO: i think this should be in an "impl From" or "impl Into" + let (authorization, semaphore) = match app + .rate_limit_by_rpc_key(ip, origin, referer, rpc_key, user_agent) + .await? + { + RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), + RateLimitResult::RateLimited(authorization, retry_at) => { + return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); } RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey), - // TODO: don't panic. give the user an error - x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x), }; - let authorized_user = AuthorizedKey::try_new(ip, origin, referer, user_agent, user_data)?; - - let db_conn = app.db_conn.clone(); - - Ok((AuthorizedRequest::User(db_conn, authorized_user), semaphore)) + Ok((authorization, semaphore)) } impl Web3ProxyApp { @@ -353,16 +375,19 @@ impl Web3ProxyApp { /// Limit the number of concurrent requests from the given key address. #[instrument(level = "trace")] - pub async fn user_rpc_key_semaphore( + pub async fn authorization_checks_semaphore( &self, - rpc_key_data: &UserKeyData, + authorization_checks: &AuthorizationChecks, ) -> anyhow::Result> { - if let Some(max_concurrent_requests) = rpc_key_data.max_concurrent_requests { + if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { let semaphore = self .rpc_key_semaphores - .get_with(rpc_key_data.rpc_key_id, async move { + .get_with(authorization_checks.rpc_key_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - trace!("new semaphore for rpc_key_id {}", rpc_key_data.rpc_key_id); + trace!( + "new semaphore for rpc_key_id {}", + authorization_checks.rpc_key_id + ); Arc::new(s) }) .await; @@ -423,93 +448,121 @@ impl Web3ProxyApp { #[instrument(level = "trace")] pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { - // TODO: dry this up with rate_limit_by_key - // TODO: do we want a semaphore here? + // TODO: dry this up with rate_limit_by_rpc_key? + + // we don't care about user agent or origin or referer + let authorization = Authorization::public( + &self.config.allowed_origin_requests_per_period, + self.db_conn(), + ip, + None, + None, + None, + )?; + + // no semaphore is needed here because login rate limits are low + // TODO: are we sure do we want a semaphore here? + let semaphore = None; + if let Some(rate_limiter) = &self.login_rate_limiter { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { - Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip, None)), + Ok(RedisRateLimitResult::Allowed(_)) => { + Ok(RateLimitResult::Allowed(authorization, semaphore)) + } Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? // this is too verbose, but a stat might be good trace!(?ip, "login rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) + + Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(RedisRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely trace!(?ip, "login rate limit is 0"); - Ok(RateLimitResult::RateLimitedIp(ip, None)) + Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "login rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip, None)) + Ok(RateLimitResult::Allowed(authorization, None)) } } } else { // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - Ok(RateLimitResult::AllowedIp(ip, None)) + Ok(RateLimitResult::Allowed(authorization, None)) } } + /// origin is included because it can override the default rate limits #[instrument(level = "trace")] pub async fn rate_limit_by_ip( &self, + allowed_origin_requests_per_period: &HashMap, ip: IpAddr, - origin: Option<&Origin>, + origin: Option, ) -> anyhow::Result { - // TODO: dry this up with rate_limit_by_key - let semaphore = self.ip_semaphore(ip).await?; + // ip rate limits don't check referer or user agent + // the do check + let authorization = Authorization::public( + allowed_origin_requests_per_period, + self.db_conn.clone(), + ip, + origin, + None, + None, + )?; if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { - let max_requests_per_period = origin - .map(|origin| { - self.config - .allowed_origin_requests_per_period - .get(&origin.to_string()) - .cloned() - }) - .unwrap_or_default(); - - match rate_limiter.throttle(ip, max_requests_per_period, 1).await { + match rate_limiter + .throttle(ip, authorization.checks.max_requests_per_period, 1) + .await + { Ok(DeferredRateLimitResult::Allowed) => { - Ok(RateLimitResult::AllowedIp(ip, semaphore)) + // rate limit allowed us. check concurrent request limits + let semaphore = self.ip_semaphore(ip).await?; + + Ok(RateLimitResult::Allowed(authorization, semaphore)) } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good trace!(?ip, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) + Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely trace!(?ip, "rate limit is 0"); - Ok(RateLimitResult::RateLimitedIp(ip, None)) + Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { - // internal error, not rate limit being hit + // this an internal error of some kind, not the rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedIp(ip, semaphore)) + // at least we can still check the semaphore + let semaphore = self.ip_semaphore(ip).await?; + + Ok(RateLimitResult::Allowed(authorization, semaphore)) } } } else { + // no redis, but we can still check the ip semaphore + let semaphore = self.ip_semaphore(ip).await?; + // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - Ok(RateLimitResult::AllowedIp(ip, semaphore)) + Ok(RateLimitResult::Allowed(authorization, semaphore)) } } // check the local cache for user data, or query the database #[instrument(level = "trace")] - pub(crate) async fn user_data( + pub(crate) async fn authorization_checks( &self, rpc_secret_key: RpcSecretKey, - ) -> anyhow::Result { - let user_data: Result<_, Arc> = self + ) -> anyhow::Result { + let authorization_checks: Result<_, Arc> = self .rpc_secret_key_cache .try_get_with(rpc_secret_key.into(), async move { trace!(?rpc_secret_key, "user cache miss"); @@ -592,9 +645,7 @@ impl Web3ProxyApp { None }; - // let user_tier_model = user_tier - - Ok(UserKeyData { + Ok(AuthorizationChecks { user_id: rpc_key_model.user_id, rpc_key_id: rpc_key_model.id, allowed_ips, @@ -606,31 +657,50 @@ impl Web3ProxyApp { max_requests_per_period: user_tier_model.max_requests_per_period, }) } - None => Ok(UserKeyData::default()), + None => Ok(AuthorizationChecks::default()), } }) .await; // TODO: what's the best way to handle this arc? try_unwrap will not work - user_data.map_err(|err| anyhow::anyhow!(err)) + authorization_checks.map_err(|err| anyhow::anyhow!(err)) } + /// Authorized the ip/origin/referer/useragent and rate limit and concurrency #[instrument(level = "trace")] - pub async fn rate_limit_by_key( + pub async fn rate_limit_by_rpc_key( &self, + ip: IpAddr, + origin: Option, + referer: Option, rpc_key: RpcSecretKey, + user_agent: Option, ) -> anyhow::Result { - let user_data = self.user_data(rpc_key).await?; + let authorization_checks = self.authorization_checks(rpc_key).await?; - if user_data.rpc_key_id == 0 { + // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key + if authorization_checks.rpc_key_id == 0 { return Ok(RateLimitResult::UnknownKey); } - let semaphore = self.user_rpc_key_semaphore(&user_data).await?; + // only allow this rpc_key to run a limited amount of concurrent requests + // TODO: rate limit should be BEFORE the semaphore! + let semaphore = self + .authorization_checks_semaphore(&authorization_checks) + .await?; - let user_max_requests_per_period = match user_data.max_requests_per_period { + let authorization = Authorization::try_new( + authorization_checks, + self.db_conn(), + ip, + origin, + referer, + user_agent, + )?; + + let user_max_requests_per_period = match authorization.checks.max_requests_per_period { None => { - return Ok(RateLimitResult::AllowedUser(user_data, semaphore)); + return Ok(RateLimitResult::Allowed(authorization, semaphore)); } Some(x) => x, }; @@ -642,7 +712,7 @@ impl Web3ProxyApp { .await { Ok(DeferredRateLimitResult::Allowed) => { - Ok(RateLimitResult::AllowedUser(user_data, semaphore)) + Ok(RateLimitResult::Allowed(authorization, semaphore)) } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry @@ -651,25 +721,25 @@ impl Web3ProxyApp { // TODO: keys are secrets! use the id instead // TODO: emit a stat trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at))) + Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: keys are secret. don't log them! trace!(?rpc_key, "rate limit is 0"); // TODO: emit a stat - Ok(RateLimitResult::RateLimitedUser(user_data, None)) + Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedUser(user_data, semaphore)) + Ok(RateLimitResult::Allowed(authorization, semaphore)) } } } else { // TODO: if no redis, rate limit with just a local cache? - Ok(RateLimitResult::AllowedUser(user_data, semaphore)) + Ok(RateLimitResult::Allowed(authorization, semaphore)) } } } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 9912aee3..81be1404 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -1,6 +1,7 @@ //! Utlities for logging errors for admins and displaying errors to users. -use crate::{app::UserKeyData, jsonrpc::JsonRpcForwardedResponse}; +use super::authorization::Authorization; +use crate::jsonrpc::JsonRpcForwardedResponse; use axum::{ headers, http::StatusCode, @@ -13,7 +14,7 @@ use ipnet::AddrParseError; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use sea_orm::DbErr; -use std::{error::Error, net::IpAddr}; +use std::error::Error; use tokio::{task::JoinError, time::Instant}; use tracing::{instrument, trace, warn}; @@ -32,12 +33,11 @@ pub enum FrontendErrorResponse { IpAddrParse(AddrParseError), JoinError(JoinError), NotFound, - RateLimitedUser(UserKeyData, Option), - RateLimitedIp(IpAddr, Option), + RateLimited(Authorization, Option), Redis(RedisError), Response(Response), /// simple way to return an error message to the user and an anyhow to our logs - StatusCode(StatusCode, String, anyhow::Error), + StatusCode(StatusCode, String, Option), UlidDecodeError(ulid::DecodeError), UnknownKey, } @@ -138,28 +138,32 @@ impl IntoResponse for FrontendErrorResponse { ), ) } - Self::RateLimitedIp(ip, _retry_at) => { - // TODO: emit a stat - // TODO: include retry_at in the error - // TODO: if retry_at is None, give an unauthorized status code? - ( - StatusCode::TOO_MANY_REQUESTS, - JsonRpcForwardedResponse::from_string( - format!("too many requests from ip {}!", ip), - Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), - None, - ), - ) - } // TODO: this should actually by the id of the key. multiple users might control one key - Self::RateLimitedUser(user_data, _retry_at) => { + Self::RateLimited(authorization, retry_at) => { // TODO: emit a stat - // TODO: include retry_at in the error + + let retry_msg = if let Some(retry_at) = retry_at { + let retry_in = retry_at.duration_since(Instant::now()).as_secs(); + + format!(" Retry in {} seconds", retry_in) + } else { + "".to_string() + }; + + // create a string with either the IP or the rpc_key_id + let msg = if authorization.checks.rpc_key_id == 0 { + format!("too many requests from {}.{}", authorization.ip, retry_msg) + } else { + format!( + "too many requests from rpc key #{}.{}", + authorization.checks.rpc_key_id, retry_msg + ) + }; + ( StatusCode::TOO_MANY_REQUESTS, JsonRpcForwardedResponse::from_string( - // TODO: better error - format!("too many requests from {:?}!", user_data), + msg, Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), None, ), diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 877ccfbc..77e10eb1 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -25,17 +25,20 @@ pub async fn proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip); - let (authorized_request, _semaphore) = ip_is_authorized(&app, ip, origin) + // TODO: do we care about keeping the TypedHeader wrapper? + let origin = origin.map(|x| x.0); + + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin) .instrument(request_span) .await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); // TODO: spawn earlier? i think we want ip_is_authorized in this future let f = tokio::spawn(async move { - app.proxy_web3_rpc(authorized_request, payload) + app.proxy_web3_rpc(authorization, payload) .instrument(request_span) .await }); @@ -64,7 +67,8 @@ pub async fn proxy_web3_rpc_with_key( let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let (authorized_request, _semaphore) = key_is_authorized( + // keep the semaphore until the end of the response + let (authorization, _semaphore) = key_is_authorized( &app, rpc_key, ip, @@ -75,14 +79,14 @@ pub async fn proxy_web3_rpc_with_key( .instrument(request_span.clone()) .await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); // the request can take a while, so we spawn so that we can start serving another request // TODO: spawn even earlier? let f = tokio::spawn(async move { - app.proxy_web3_rpc(authorized_request, payload) + app.proxy_web3_rpc(authorization, payload) .instrument(request_span) .await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 4291bd11..4c2eb75b 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -2,8 +2,8 @@ //! //! WebSockets are the preferred method of receiving requests, but not all clients have good support. -use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest}; -use super::errors::FrontendResult; +use super::authorization::{ip_is_authorized, key_is_authorized, Authorization}; +use super::errors::{FrontendErrorResponse, FrontendResult}; use axum::headers::{Origin, Referer, UserAgent}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, @@ -20,6 +20,7 @@ use futures::{ }; use handlebars::Handlebars; use hashbrown::HashMap; +use http::StatusCode; use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; @@ -43,18 +44,20 @@ pub async fn websocket_handler( // TODO: i don't like logging ips. move this to trace level? let request_span = error_span!("request", %ip, ?origin); - let (authorized_request, _semaphore) = ip_is_authorized(&app, ip, origin) + let origin = origin.map(|x| x.0); + + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin) .instrument(request_span) .await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); match ws_upgrade { Some(ws) => Ok(ws .on_upgrade(|socket| { - proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + proxy_web3_socket(app, authorization, socket).instrument(request_span) }) .into_response()), None => { @@ -90,7 +93,7 @@ pub async fn websocket_handler_with_key( let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let (authorized_request, _semaphore) = key_is_authorized( + let (authorization, _semaphore) = key_is_authorized( &app, rpc_key, ip, @@ -102,39 +105,52 @@ pub async fn websocket_handler_with_key( .await?; // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); match ws_upgrade { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { - proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + proxy_web3_socket(app, authorization, socket).instrument(request_span) })), None => { // if no websocket upgrade, this is probably a user loading the url with their browser - if let Some(redirect) = &app.config.redirect_user_url { - // TODO: store this on the app and use register_template? - let reg = Handlebars::new(); - - // TODO: show the user's address, not their id (remember to update the checks for {{user_id}}} in app.rs) - // TODO: query to get the user's address. expose that instead of user_id - if let AuthorizedRequest::User(_, authorized_key) = authorized_request.as_ref() { - let user_url = reg - .render_template( - redirect, - &json!({ "rpc_key_id": authorized_key.rpc_key_id }), - ) - .expect("templating should always work"); - - // this is not a websocket. redirect to a page for this user - Ok(Redirect::to(&user_url).into_response()) - } else { - // TODO: i think this is impossible - Err(anyhow::anyhow!("this page is for rpcs").into()) + match ( + &app.config.redirect_public_url, + &app.config.redirect_rpc_key_url, + authorization.checks.rpc_key_id, + ) { + (None, None, _) => Err(anyhow::anyhow!( + "redirect_rpc_key_url not set. only websockets work here" + ) + .into()), + (Some(redirect_public_url), _, 0) => { + Ok(Redirect::to(redirect_public_url).into_response()) } - } else { - // TODO: do not use an anyhow error. send the user a 400 - Err(anyhow::anyhow!("redirect_user_url not set. only websockets work here").into()) + (_, Some(redirect_rpc_key_url), rpc_key_id) => { + let reg = Handlebars::new(); + + if authorization.checks.rpc_key_id == 0 { + // TODO: i think this is impossible + Err(anyhow::anyhow!("this page is for rpcs").into()) + } else { + let redirect_rpc_key_url = reg + .render_template( + redirect_rpc_key_url, + &json!({ "rpc_key_id": rpc_key_id }), + ) + .expect("templating should always work"); + + // this is not a websocket. redirect to a page for this user + Ok(Redirect::to(&redirect_rpc_key_url).into_response()) + } + } + // any other combinations get a simple error + _ => Err(FrontendErrorResponse::StatusCode( + StatusCode::BAD_REQUEST, + "this page is for rpcs".to_string(), + None, + )), } } } @@ -143,7 +159,7 @@ pub async fn websocket_handler_with_key( #[instrument(level = "trace")] async fn proxy_web3_socket( app: Arc, - authorized_request: Arc, + authorization: Arc, socket: WebSocket, ) { // split the websocket so we can read and write concurrently @@ -153,19 +169,14 @@ async fn proxy_web3_socket( let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket( - app, - authorized_request, - ws_rx, - response_sender, - )); + tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); } /// websockets support a few more methods than http clients #[instrument(level = "trace")] async fn handle_socket_payload( app: Arc, - authorized_request: Arc, + authorization: &Arc, payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicUsize, @@ -173,19 +184,21 @@ async fn handle_socket_payload( ) -> Message { // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { - Ok(payload) => { + Ok(json_request) => { // TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic - let id = payload.id.clone(); + let id = json_request.id.clone(); - let response: anyhow::Result = match &payload.method[..] { + let response: anyhow::Result = match &json_request.method + [..] + { "eth_subscribe" => { // TODO: what should go in this span? let span = error_span!("eth_subscribe"); let response = app .eth_subscribe( - authorized_request, - payload, + authorization.clone(), + json_request, subscription_count, response_sender.clone(), ) @@ -213,7 +226,7 @@ async fn handle_socket_payload( "eth_unsubscribe" => { // TODO: how should handle rate limits and stats on this? // TODO: handle invalid params - let subscription_id = payload.params.unwrap().to_string(); + let subscription_id = json_request.params.unwrap().to_string(); let partial_response = match subscriptions.remove(&subscription_id) { None => false, @@ -228,7 +241,10 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => app.proxy_web3_rpc(authorized_request, payload.into()).await, + _ => { + app.proxy_web3_rpc(authorization.clone(), json_request.into()) + .await + } }; (id, response) @@ -256,7 +272,7 @@ async fn handle_socket_payload( #[instrument(level = "trace")] async fn read_web3_socket( app: Arc, - authorized_request: Arc, + authorization: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -269,7 +285,7 @@ async fn read_web3_socket( Message::Text(payload) => { handle_socket_payload( app.clone(), - authorized_request.clone(), + &authorization, &payload, &response_sender, &subscription_count, @@ -292,7 +308,7 @@ async fn read_web3_socket( handle_socket_payload( app.clone(), - authorized_request.clone(), + &authorization, payload, &response_sender, &subscription_count, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index c88cc19f..6ab91c2d 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -2,7 +2,7 @@ use super::connection::Web3Connection; use super::connections::Web3Connections; use super::transactions::TxStatus; -use crate::frontend::authorization::AuthorizedRequest; +use crate::frontend::authorization::Authorization; use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; @@ -88,7 +88,7 @@ impl Web3Connections { #[instrument(level = "trace")] pub async fn block( &self, - authorized_request: Option<&Arc>, + authorization: &Arc, hash: &H256, rpc: Option<&Arc>, ) -> anyhow::Result { @@ -103,7 +103,7 @@ impl Web3Connections { // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { - rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30)) + rpc.wait_for_request_handle(authorization, Duration::from_secs(30)) .await? .request( "eth_getBlockByHash", @@ -118,9 +118,9 @@ impl Web3Connections { let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params }); let request: JsonRpcRequest = serde_json::from_value(request)?; - // TODO: request_metadata? maybe we should put it in the authorized_request? + // TODO: request_metadata? maybe we should put it in the authorization? let response = self - .try_send_best_upstream_server(authorized_request, request, None, None) + .try_send_best_upstream_server(authorization, request, None, None) .await?; let block = response.result.unwrap(); @@ -139,8 +139,12 @@ impl Web3Connections { } /// Convenience method to get the cannonical block at a given block height. - pub async fn block_hash(&self, num: &U64) -> anyhow::Result<(H256, bool)> { - let (block, is_archive_block) = self.cannonical_block(num).await?; + pub async fn block_hash( + &self, + authorization: &Arc, + num: &U64, + ) -> anyhow::Result<(H256, bool)> { + let (block, is_archive_block) = self.cannonical_block(authorization, num).await?; let hash = block.hash.expect("Saved blocks should always have hashes"); @@ -148,7 +152,11 @@ impl Web3Connections { } /// Get the heaviest chain's block from cache or backend rpc - pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<(ArcBlock, bool)> { + pub async fn cannonical_block( + &self, + authorization: &Arc, + num: &U64, + ) -> anyhow::Result<(ArcBlock, bool)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) @@ -174,8 +182,8 @@ impl Web3Connections { // deref to not keep the lock open if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set - // TODO: pass authorized_request through here? - let block = self.block(None, &block_hash, None).await?; + // TODO: pass authorization through here? + let block = self.block(authorization, &block_hash, None).await?; return Ok((block, archive_needed)); } @@ -186,9 +194,9 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: if error, retry? - // TODO: request_metadata or authorized_request? + // TODO: request_metadata or authorization? let response = self - .try_send_best_upstream_server(None, request, None, Some(num)) + .try_send_best_upstream_server(authorization, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; @@ -205,6 +213,7 @@ impl Web3Connections { pub(super) async fn process_incoming_blocks( &self, + authorization: &Arc, block_receiver: flume::Receiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. @@ -219,6 +228,7 @@ impl Web3Connections { let rpc_name = rpc.name.clone(); if let Err(err) = self .process_block_from_rpc( + authorization, &mut connection_heads, new_block, rpc, @@ -242,6 +252,7 @@ impl Web3Connections { /// TODO: return something? async fn process_block_from_rpc( &self, + authorization: &Arc, connection_heads: &mut HashMap, rpc_head_block: Option, rpc: Arc, @@ -305,7 +316,10 @@ impl Web3Connections { // this option should always be populated let conn_rpc = self.conns.get(conn_name); - match self.block(None, connection_head_hash, conn_rpc).await { + match self + .block(authorization, connection_head_hash, conn_rpc) + .await + { Ok(block) => block, Err(err) => { warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes"); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 23e76721..2094b27d 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -4,7 +4,7 @@ use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; -use crate::frontend::authorization::AuthorizedRequest; +use crate::frontend::authorization::Authorization; use anyhow::Context; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; @@ -12,6 +12,7 @@ use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; +use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -63,6 +64,7 @@ impl Web3Connection { pub async fn spawn( name: String, chain_id: u64, + db_conn: Option, url_str: String, // optional because this is only used for http providers. websocket providers don't use it http_client: Option, @@ -111,12 +113,14 @@ impl Web3Connection { .retrying_reconnect(block_sender.as_ref(), false) .await?; + let authorization = Arc::new(Authorization::local(db_conn)?); + // check the server's chain_id here // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: what should the timeout be? let found_chain_id: Result = new_connection - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30)) .await? .request( "eth_chainId", @@ -149,9 +153,11 @@ impl Web3Connection { // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { let new_connection = new_connection.clone(); + let authorization = authorization.clone(); tokio::spawn(async move { new_connection .subscribe( + &authorization, http_interval_sender, block_map, block_sender, @@ -174,13 +180,19 @@ impl Web3Connection { // TODO: i think instead of atomics, we could maybe use a watch channel sleep(Duration::from_millis(250)).await; - new_connection.check_block_data_limit().await?; + new_connection + .check_block_data_limit(&authorization) + .await?; } Ok((new_connection, handle)) } - async fn check_block_data_limit(self: &Arc) -> anyhow::Result> { + /// TODO: should check_block_data_limit take authorization? + async fn check_block_data_limit( + self: &Arc, + authorization: &Arc, + ) -> anyhow::Result> { let mut limit = None; for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { @@ -206,7 +218,7 @@ impl Web3Connection { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? let archive_result: Result = self - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(authorization, Duration::from_secs(30)) .await? .request( "eth_getCode", @@ -453,6 +465,7 @@ impl Web3Connection { /// subscribe to blocks and transactions with automatic reconnects async fn subscribe( self: Arc, + authorization: &Arc, http_interval_sender: Option>>, block_map: BlockHashesCache, block_sender: Option>, @@ -468,6 +481,7 @@ impl Web3Connection { if let Some(block_sender) = &block_sender { let f = self.clone().subscribe_new_heads( + authorization.clone(), http_interval_receiver, block_sender.clone(), block_map.clone(), @@ -479,7 +493,7 @@ impl Web3Connection { if let Some(tx_id_sender) = &tx_id_sender { let f = self .clone() - .subscribe_pending_transactions(tx_id_sender.clone()); + .subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone()); futures.push(flatten_handle(tokio::spawn(f))); } @@ -540,6 +554,7 @@ impl Web3Connection { /// Subscribe to new blocks. If `reconnect` is true, this runs forever. async fn subscribe_new_heads( self: Arc, + authorization: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, block_map: BlockHashesCache, @@ -560,7 +575,7 @@ impl Web3Connection { loop { // TODO: what should the max_wait be? match self - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30)) .await { Ok(active_request_handle) => { @@ -648,7 +663,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = self - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30)) .await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -658,7 +673,7 @@ impl Web3Connection { // all it does is print "new block" for the same block as current block // TODO: how does this get wrapped in an arc? does ethers handle that? let block: Result, _> = self - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30)) .await? .request( "eth_getBlockByNumber", @@ -715,6 +730,7 @@ impl Web3Connection { async fn subscribe_pending_transactions( self: Arc, + authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { info!(%self, "watching pending transactions"); @@ -752,7 +768,7 @@ impl Web3Connection { Web3Provider::Ws(provider) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self - .wait_for_request_handle(None, Duration::from_secs(30)) + .wait_for_request_handle(&authorization, Duration::from_secs(30)) .await; let mut stream = provider.subscribe_pending_txs().await?; @@ -783,13 +799,13 @@ impl Web3Connection { #[instrument] pub async fn wait_for_request_handle( self: &Arc, - authorized_request: Option<&Arc>, + authorization: &Arc, max_wait: Duration, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { - let x = self.try_request_handle(authorized_request).await; + let x = self.try_request_handle(authorization).await; trace!(?x, "try_request_handle"); @@ -819,7 +835,7 @@ impl Web3Connection { #[instrument] pub async fn try_request_handle( self: &Arc, - authorized_request: Option<&Arc>, + authorization: &Arc, ) -> anyhow::Result { // check that we are connected if !self.has_provider().await { @@ -850,7 +866,7 @@ impl Web3Connection { } }; - let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned()); + let handle = OpenRequestHandle::new(authorization.clone(), self.clone()); Ok(OpenRequestResult::Handle(handle)) } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index dd9b9ea6..c725414c 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -7,7 +7,7 @@ use super::request::{ use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; -use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata}; +use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use anyhow::Context; @@ -21,6 +21,7 @@ use futures::StreamExt; use hashbrown::HashMap; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; +use sea_orm::DatabaseConnection; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -61,6 +62,7 @@ impl Web3Connections { #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: u64, + db_conn: Option, server_configs: HashMap, http_client: Option, redis_pool: Option, @@ -119,6 +121,7 @@ impl Web3Connections { return None; } + let db_conn = db_conn.clone(); let http_client = http_client.clone(); let redis_pool = redis_pool.clone(); let http_interval_sender = http_interval_sender.clone(); @@ -137,6 +140,7 @@ impl Web3Connections { server_config .spawn( server_name, + db_conn, redis_pool, chain_id, http_client, @@ -212,6 +216,8 @@ impl Web3Connections { min_synced_rpcs, }); + let authorization = Arc::new(Authorization::local(db_conn.clone())?); + let handle = { let connections = connections.clone(); @@ -219,6 +225,7 @@ impl Web3Connections { // TODO: try_join_all with the other handles here connections .subscribe( + authorization, pending_tx_id_receiver, block_receiver, head_block_sender, @@ -240,6 +247,7 @@ impl Web3Connections { /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, + authorization: Arc, pending_tx_id_receiver: flume::Receiver, block_receiver: flume::Receiver, head_block_sender: Option>, @@ -253,10 +261,12 @@ impl Web3Connections { // forwards new transacitons to pending_tx_receipt_sender if let Some(pending_tx_sender) = pending_tx_sender.clone() { let clone = self.clone(); + let authorization = authorization.clone(); let handle = task::spawn(async move { // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { let f = clone.clone().process_incoming_tx_id( + authorization.clone(), rpc, pending_tx_id, pending_tx_sender.clone(), @@ -274,11 +284,13 @@ impl Web3Connections { if let Some(head_block_sender) = head_block_sender { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); + let handle = task::Builder::default() .name("process_incoming_blocks") .spawn(async move { connections .process_incoming_blocks( + &authorization, block_receiver, head_block_sender, pending_tx_sender, @@ -373,7 +385,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn next_upstream_server( &self, - authorized_request: Option<&Arc>, + authorization: &Arc, request_metadata: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, @@ -432,7 +444,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_request_handle(authorized_request).await { + match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -476,7 +488,7 @@ impl Web3Connections { // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, - authorized_request: Option<&Arc>, + authorization: &Arc, block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -491,7 +503,7 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle(authorized_request).await { + match connection.try_request_handle(authorization).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -517,7 +529,7 @@ impl Web3Connections { /// be sure there is a timeout on this or it might loop forever pub async fn try_send_best_upstream_server( &self, - authorized_request: Option<&Arc>, + authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, @@ -532,7 +544,7 @@ impl Web3Connections { } match self .next_upstream_server( - authorized_request, + authorization, request_metadata, &skip_rpcs, min_block_needed, @@ -655,16 +667,13 @@ impl Web3Connections { #[instrument] pub async fn try_send_all_upstream_servers( &self, - authorized_request: Option<&Arc>, + authorization: &Arc, request: JsonRpcRequest, request_metadata: Option>, block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self - .upstream_servers(authorized_request, block_needed) - .await - { + match self.upstream_servers(authorization, block_needed).await { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 0212bf79..95bdbbb5 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,6 +1,6 @@ use super::connection::Web3Connection; use super::provider::Web3Provider; -use crate::frontend::authorization::AuthorizedRequest; +use crate::frontend::authorization::Authorization; use crate::metered::{JsonRpcErrorCount, ProviderErrorCount}; use anyhow::Context; use chrono::Utc; @@ -13,8 +13,8 @@ use metered::HitCount; use metered::ResponseTime; use metered::Throughput; use rand::Rng; -use sea_orm::ActiveEnum; use sea_orm::ActiveModelTrait; +use sea_orm::{ActiveEnum}; use serde_json::json; use std::fmt; use std::sync::atomic::{self, AtomicBool, Ordering}; @@ -35,7 +35,7 @@ pub enum OpenRequestResult { /// Make RPC requests through this handle and drop it when you are done. #[derive(Debug)] pub struct OpenRequestHandle { - authorized_request: Arc, + authorization: Arc, conn: Arc, // TODO: this is the same metrics on the conn. use a reference? metrics: Arc, @@ -75,41 +75,43 @@ impl From for RequestErrorHandler { } } -impl AuthorizedRequest { +impl Authorization { /// Save a RPC call that return "execution reverted" to the database. async fn save_revert( self: Arc, method: Method, params: EthCallFirstParams, ) -> anyhow::Result<()> { - if let Self::User(Some(db_conn), authorized_request) = &*self { - // TODO: should the database set the timestamp? - let timestamp = Utc::now(); - let to: Vec = params - .to - .as_bytes() - .try_into() - .expect("address should always convert to a Vec"); - let call_data = params.data.map(|x| format!("{}", x)); + let db_conn = self.db_conn.as_ref().context("no database connection")?; - let rl = revert_log::ActiveModel { - rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id), - method: sea_orm::Set(method), - to: sea_orm::Set(to), - call_data: sea_orm::Set(call_data), - timestamp: sea_orm::Set(timestamp), - ..Default::default() - }; + // TODO: should the database set the timestamp? + // we intentionally use "now" and not the time the request started + // why? because we aggregate stats and setting one in the past could cause confusion + let timestamp = Utc::now(); + let to: Vec = params + .to + .as_bytes() + .try_into() + .expect("address should always convert to a Vec"); + let call_data = params.data.map(|x| format!("{}", x)); - let rl = rl - .save(db_conn) - .await - .context("Failed saving new revert log")?; + let rl = revert_log::ActiveModel { + rpc_key_id: sea_orm::Set(self.checks.rpc_key_id), + method: sea_orm::Set(method), + to: sea_orm::Set(to), + call_data: sea_orm::Set(call_data), + timestamp: sea_orm::Set(timestamp), + ..Default::default() + }; - // TODO: what log level? - // TODO: better format - trace!(?rl); - } + let rl = rl + .save(db_conn) + .await + .context("Failed saving new revert log")?; + + // TODO: what log level? + // TODO: better format + trace!(?rl); // TODO: return something useful Ok(()) @@ -118,10 +120,7 @@ impl AuthorizedRequest { #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { - pub fn new( - conn: Arc, - authorized_request: Option>, - ) -> Self { + pub fn new(authorization: Arc, conn: Arc) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! @@ -136,11 +135,8 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); let used = false.into(); - let authorized_request = - authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal)); - Self { - authorized_request, + authorization, conn, metrics, used, @@ -176,7 +172,7 @@ impl OpenRequestHandle { // TODO: use tracing spans // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose - // the authorized_request field is already on a parent span + // the authorization field is already on a parent span trace!(rpc=%self.conn, %method, "request"); let mut provider = None; @@ -209,33 +205,25 @@ impl OpenRequestHandle { if !["eth_call", "eth_estimateGas"].contains(&method) { trace!(%method, "skipping save on revert"); RequestErrorHandler::DebugLevel - } else if self.authorized_request.db_conn().is_none() { - trace!(%method, "no database. skipping save on revert"); - RequestErrorHandler::DebugLevel - } else if let AuthorizedRequest::User(db_conn, y) = self.authorized_request.as_ref() - { - if db_conn.is_none() { - trace!(%method, "no database. skipping save on revert"); + } else if self.authorization.db_conn.is_some() { + let log_revert_chance = self.authorization.checks.log_revert_chance; + + if log_revert_chance == 0.0 { + trace!(%method, "no chance. skipping save on revert"); + RequestErrorHandler::DebugLevel + } else if log_revert_chance == 1.0 { + trace!(%method, "gaurenteed chance. SAVING on revert"); + error_handler + } else if rand::thread_rng().gen_range(0.0f64..=1.0) < log_revert_chance { + trace!(%method, "missed chance. skipping save on revert"); RequestErrorHandler::DebugLevel } else { - let log_revert_chance = y.log_revert_chance; - - if log_revert_chance == 0.0 { - trace!(%method, "no chance. skipping save on revert"); - RequestErrorHandler::DebugLevel - } else if log_revert_chance == 1.0 { - trace!(%method, "gaurenteed chance. SAVING on revert"); - error_handler - } else if rand::thread_rng().gen_range(0.0f64..=1.0) > log_revert_chance { - trace!(%method, "missed chance. skipping save on revert"); - RequestErrorHandler::DebugLevel - } else { - trace!("Saving on revert"); - // TODO: is always logging at debug level fine? - error_handler - } + trace!("Saving on revert"); + // TODO: is always logging at debug level fine? + error_handler } } else { + trace!(%method, "no database. skipping save on revert"); RequestErrorHandler::DebugLevel } } else { @@ -298,10 +286,7 @@ impl OpenRequestHandle { .unwrap(); // spawn saving to the database so we don't slow down the request - let f = self - .authorized_request - .clone() - .save_revert(method, params.0 .0); + let f = self.authorization.clone().save_revert(method, params.0 .0); tokio::spawn(f); } diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index a458b1b3..e5c0fb52 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -1,3 +1,5 @@ +use crate::frontend::authorization::Authorization; + ///! Load balanced communication with a group of web3 providers use super::connection::Web3Connection; use super::connections::Web3Connections; @@ -18,6 +20,7 @@ pub enum TxStatus { impl Web3Connections { async fn query_transaction_status( &self, + authorization: &Arc, rpc: Arc, pending_tx_id: TxHash, ) -> Result, ProviderError> { @@ -25,7 +28,7 @@ impl Web3Connections { // TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? - let tx: Transaction = match rpc.try_request_handle(None).await { + let tx: Transaction = match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request( @@ -62,6 +65,7 @@ impl Web3Connections { /// dedupe transaction and send them to any listening clients pub(super) async fn process_incoming_tx_id( self: Arc, + authorization: Arc, rpc: Arc, pending_tx_id: TxHash, pending_tx_sender: broadcast::Sender, @@ -84,7 +88,7 @@ impl Web3Connections { // query the rpc for this transaction // it is possible that another rpc is also being queried. thats fine. we want the fastest response match self - .query_transaction_status(rpc.clone(), pending_tx_id) + .query_transaction_status(&authorization, rpc.clone(), pending_tx_id) .await { Ok(Some(tx_state)) => { diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 90f166cc..8ae6a473 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -159,7 +159,7 @@ pub fn get_query_window_seconds_from_params( FrontendErrorResponse::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), - e.into(), + Some(e.into()), ) }) }, @@ -290,6 +290,7 @@ pub async fn query_user_stats<'a>( let user_id = get_user_id_from_params(redis_conn, bearer, params).await?; let (condition, q) = if user_id == 0 { // 0 means everyone. don't filter on user + // TODO: 0 or None? (condition, q) } else { let q = q.left_join(rpc_key::Entity); @@ -302,36 +303,33 @@ pub async fn query_user_stats<'a>( }; // filter on rpc_key_id + // if rpc_key_id, all the requests without a key will be loaded // TODO: move getting the param and checking the bearer token into a helper function let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id.parse::().map_err(|e| { FrontendErrorResponse::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), - e.into(), + Some(e.into()), ) })?; - if rpc_key_id == 0 { + response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + + let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); + + let q = q.group_by(rpc_accounting::Column::RpcKeyId); + + if user_id == 0 { + // no user id, we did not join above + let q = q.left_join(rpc_key::Entity); + (condition, q) } else { - response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + // user_id added a join on rpc_key already. only filter on user_id + let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); - - let q = q.group_by(rpc_accounting::Column::RpcKeyId); - - if user_id == 0 { - // no user id, we did not join above - let q = q.left_join(rpc_key::Entity); - - (condition, q) - } else { - // user_id added a join on rpc_key already. only filter on user_id - let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - - (condition, q) - } + (condition, q) } } else { (condition, q)