simplify authorization types so we can pass them deeper easily

This commit is contained in:
Bryan Stitt 2022-11-08 19:58:11 +00:00
parent 2c4586302d
commit c33342d9dd
18 changed files with 541 additions and 404 deletions

@ -17,6 +17,7 @@ redirect_user_url = "https://llamanodes.com/dashboard/keys?key={rpc_key_id}"
sentry_url = "https://SENTRY_KEY_A.ingest.sentry.io/SENTRY_KEY_B"
# public limits are when no key is used. these are instead grouped by ip
# 0 = block all public requests
public_max_concurrent_requests = 3
# 0 = block all public requests

@ -22,7 +22,9 @@ pub struct RedisRateLimiter {
}
pub enum RedisRateLimitResult {
/// TODO: what is the inner value?
Allowed(u64),
/// TODO: what is the inner value?
RetryAt(Instant, u64),
RetryNever,
}

@ -3,7 +3,7 @@
use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
@ -46,7 +46,7 @@ use tracing::{error, info, instrument, trace, warn};
use ulid::Ulid;
// TODO: make this customizable?
static APP_USER_AGENT: &str = concat!(
pub static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
@ -62,10 +62,12 @@ type ResponseCache =
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
#[derive(Clone, Debug, Default, From)]
pub struct UserKeyData {
/// database id of the primary user
pub struct AuthorizationChecks {
/// database id of the primary user.
/// TODO: do we need this? its on the authorization so probably not
pub user_id: u64,
/// database id of the rpc key
/// if this is 0, then this request is being rate limited by ip
pub rpc_key_id: u64,
/// if None, allow unlimited queries. inherited from the user_tier
pub max_requests_per_period: Option<u64>,
@ -109,7 +111,8 @@ pub struct Web3ProxyApp {
pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>,
// TODO: this key should be our RpcSecretKey class, not Ulid
pub rpc_secret_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_secret_key_cache:
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
@ -193,7 +196,7 @@ impl Web3ProxyApp {
shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
// safety checks on the config
if let Some(redirect) = &top_config.app.redirect_user_url {
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{rpc_key_id}"),
"redirect_user_url user url must contain \"{{rpc_key_id}}\""
@ -330,6 +333,7 @@ impl Web3ProxyApp {
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
top_config.app.chain_id,
db_conn.clone(),
balanced_rpcs,
http_client.clone(),
vredis_pool.clone(),
@ -356,6 +360,7 @@ impl Web3ProxyApp {
} else {
let (private_rpcs, private_handle) = Web3Connections::spawn(
top_config.app.chain_id,
db_conn.clone(),
private_rpcs,
http_client.clone(),
vredis_pool.clone(),
@ -522,7 +527,7 @@ impl Web3ProxyApp {
#[instrument(level = "trace")]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorized_request: Arc<AuthorizedRequest>,
authorization: Arc<Authorization>,
payload: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
@ -719,7 +724,7 @@ impl Web3ProxyApp {
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level
@ -734,14 +739,14 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(
max_time,
self.proxy_web3_rpc_request(authorized_request, request),
self.proxy_web3_rpc_request(&authorization, request),
)
.await??,
),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(
max_time,
self.proxy_web3_rpc_requests(authorized_request, requests),
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??,
),
@ -758,27 +763,24 @@ impl Web3ProxyApp {
#[instrument(level = "trace")]
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
let num_requests = requests.len();
// TODO: spawn so the requests go in parallel
// TODO: i think we will need to flatten
let responses = join_all(
requests
.into_iter()
.map(|request| {
let authorized_request = authorized_request.clone();
// TODO: spawn so the requests go in parallel
// TODO: i think we will need to flatten
self.proxy_web3_rpc_request(authorized_request, request)
})
.map(|request| self.proxy_web3_rpc_request(authorization, request))
.collect::<Vec<_>>(),
)
.await;
// TODO: i'm sure this could be done better with iterators
// TODO: i'm sure this could be done better with iterators. we could return the error earlier then, too
// TODO: stream the response?
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
for response in responses {
collected.push(response?);
@ -809,7 +811,7 @@ impl Web3ProxyApp {
#[instrument(level = "trace")]
async fn proxy_web3_rpc_request(
self: &Arc<Self>,
authorized_request: Arc<AuthorizedRequest>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
@ -818,7 +820,7 @@ impl Web3ProxyApp {
let request_metadata = Arc::new(RequestMetadata::new(60, &request)?);
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out
// TODO: instead of cloning, take the id out?
let request_id = request.id.clone();
// TODO: if eth_chainId or net_version, serve those without querying the backend
@ -947,7 +949,7 @@ impl Web3ProxyApp {
return rpcs
.try_send_all_upstream_servers(
Some(&authorized_request),
authorization,
request,
Some(request_metadata),
None,
@ -1013,6 +1015,7 @@ impl Web3ProxyApp {
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
let request_block_id = if let Some(request_block_needed) = block_needed(
authorization,
method,
request.params.as_mut(),
head_block_id.num,
@ -1021,8 +1024,10 @@ impl Web3ProxyApp {
.await?
{
// TODO: maybe this should be on the app and not on balanced_rpcs
let (request_block_hash, archive_needed) =
self.balanced_rpcs.block_hash(&request_block_needed).await?;
let (request_block_hash, archive_needed) = self
.balanced_rpcs
.block_hash(authorization, &request_block_needed)
.await?;
if archive_needed {
request_metadata
@ -1049,7 +1054,7 @@ impl Web3ProxyApp {
let mut response = {
let request_metadata = request_metadata.clone();
let authorized_request = authorized_request.clone();
let authorization = authorization.clone();
self.response_cache
.try_get_with(cache_key, async move {
@ -1059,7 +1064,7 @@ impl Web3ProxyApp {
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
Some(&authorized_request),
&authorization,
request,
Some(&request_metadata),
Some(&request_block_id.num),
@ -1085,14 +1090,10 @@ impl Web3ProxyApp {
// replace the id with our request's id.
response.id = request_id;
// DRY this up by just returning the partial result (or error) here
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
) {
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
method.to_string(),
authorized_key,
authorization.clone(),
request_metadata,
&response,
);
@ -1109,12 +1110,13 @@ impl Web3ProxyApp {
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
) {
let response_stat =
ProxyResponseStat::new(request.method, authorized_key, request_metadata, &response);
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request.method,
authorization.clone(),
request_metadata,
&response,
);
stat_sender
.send_async(response_stat.into())

@ -1,4 +1,4 @@
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use chrono::{TimeZone, Utc};
use derive_more::From;
@ -18,17 +18,30 @@ use tracing::{error, info};
/// TODO: can we use something inside sea_orm instead?
#[derive(Debug)]
pub struct ProxyResponseStat {
rpc_key_id: u64,
authorization: Arc<Authorization>,
method: String,
archive_request: bool,
error_response: bool,
request_bytes: u64,
/// if backend_requests is 0, there was a cache_hit
backend_requests: u64,
error_response: bool,
response_bytes: u64,
response_millis: u64,
}
impl ProxyResponseStat {
/// TODO: think more about this. probably rename it
fn key(&self) -> ProxyResponseAggregateKey {
ProxyResponseAggregateKey {
rpc_key_id: self.authorization.checks.rpc_key_id,
// TODO: include Origin here?
method: self.method.clone(),
archive_request: self.archive_request,
error_response: self.error_response,
}
}
}
pub struct ProxyResponseHistograms {
request_bytes: Histogram<u64>,
response_bytes: Histogram<u64>,
@ -50,6 +63,7 @@ impl Default for ProxyResponseHistograms {
}
}
// TODO: think more about if we should include IP address in this
#[derive(Clone, From, Hash, PartialEq, Eq)]
struct ProxyResponseAggregateKey {
rpc_key_id: u64,
@ -62,9 +76,9 @@ struct ProxyResponseAggregateKey {
pub struct ProxyResponseAggregate {
frontend_requests: u64,
backend_requests: u64,
// TODO: related to backend_requests. get this level of detail out
// TODO: related to backend_requests
// backend_retries: u64,
// TODO: related to backend_requests. get this level of detail out
// TODO: related to backend_requests
// no_servers: u64,
cache_misses: u64,
cache_hits: u64,
@ -164,9 +178,10 @@ impl ProxyResponseAggregate {
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
let max_response_bytes = response_bytes.max();
// TODO: Set origin and maybe other things on this model. probably not the ip though
let aggregated_stat_model = rpc_accounting::ActiveModel {
id: sea_orm::NotSet,
// origin: sea_orm::Set(key.authorization.origin.to_string()),
rpc_key_id: sea_orm::Set(key.rpc_key_id),
chain_id: sea_orm::Set(chain_id),
method: sea_orm::Set(key.method),
@ -215,7 +230,7 @@ impl ProxyResponseStat {
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
pub fn new(
method: String,
authorized_key: AuthorizedKey,
authorization: Arc<Authorization>,
metadata: Arc<RequestMetadata>,
response: &JsonRpcForwardedResponse,
) -> Self {
@ -236,7 +251,7 @@ impl ProxyResponseStat {
let response_millis = metadata.start_instant.elapsed().as_millis() as u64;
Self {
rpc_key_id: authorized_key.rpc_key_id,
authorization,
archive_request,
method,
backend_requests,
@ -246,15 +261,6 @@ impl ProxyResponseStat {
response_millis,
}
}
fn key(&self) -> ProxyResponseAggregateKey {
ProxyResponseAggregateKey {
rpc_key_id: self.rpc_key_id,
method: self.method.clone(),
error_response: self.error_response,
archive_request: self.archive_request,
}
}
}
impl StatEmitter {

@ -283,7 +283,7 @@ mod tests {
public_requests_per_period: Some(1_000_000),
response_cache_max_bytes: 10_usize.pow(7),
redirect_public_url: Some("example.com/".to_string()),
redirect_user_url: Some("example.com/{{rpc_key_id}}".to_string()),
redirect_rpc_key_url: Some("example.com/{rpc_key_id}".to_string()),
..Default::default()
},
balanced_rpcs: HashMap::from([

@ -66,7 +66,7 @@ impl CheckConfigSubCommand {
}
// TODO: also check that it contains rpc_key_id!
match top_config.app.redirect_user_url {
match top_config.app.redirect_rpc_key_url {
None => {
warn!("app.redirect_user_url is None. Registered users will get an error page instead of a redirect")
}

@ -4,9 +4,10 @@ use ethers::{
prelude::{BlockNumber, U64},
types::H256,
};
use std::sync::Arc;
use tracing::{instrument, warn};
use crate::rpcs::connections::Web3Connections;
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 {
match block_num {
@ -40,6 +41,7 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 {
/// modify params to always have a block number and not "latest"
#[instrument(level = "trace")]
pub async fn clean_block_number(
authorization: &Arc<Authorization>,
params: &mut serde_json::Value,
block_param_id: usize,
latest_block: U64,
@ -70,7 +72,7 @@ pub async fn clean_block_number(
let block_hash: H256 =
serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs.block(None, &block_hash, None).await?;
let block = rpcs.block(authorization, &block_hash, None).await?;
block
.number
@ -98,6 +100,7 @@ pub async fn clean_block_number(
// TODO: change this to also return the hash needed?
#[instrument(level = "trace")]
pub async fn block_needed(
authorization: &Arc<Authorization>,
method: &str,
params: Option<&mut serde_json::Value>,
head_block_num: U64,
@ -203,7 +206,7 @@ pub async fn block_needed(
}
};
match clean_block_number(params, block_param_id, head_block_num, rpcs).await {
match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await {
Ok(block) => Ok(Some(block)),
Err(err) => {
// TODO: seems unlikely that we will get here

@ -6,6 +6,7 @@ use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::TxHash;
use hashbrown::HashMap;
use sea_orm::DatabaseConnection;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -118,7 +119,7 @@ pub struct AppConfig {
pub redirect_public_url: Option<String>,
/// the stats page url for a logged in user. if set, must contain "{rpc_key_id}"
pub redirect_user_url: Option<String>,
pub redirect_rpc_key_url: Option<String>,
/// Optionally send errors to <https://sentry.io>
pub sentry_url: Option<String>,
@ -199,6 +200,7 @@ impl Web3ConnectionConfig {
pub async fn spawn(
self,
name: String,
db_conn: Option<DatabaseConnection>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64,
http_client: Option<reqwest::Client>,
@ -228,6 +230,7 @@ impl Web3ConnectionConfig {
Web3Connection::spawn(
name,
chain_id,
db_conn,
self.url,
http_client,
http_interval_sender,

@ -1,16 +1,16 @@
//! Utilities for authorization of logged in and anonymous users.
use super::errors::FrontendErrorResponse;
use crate::app::{UserKeyData, Web3ProxyApp};
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::jsonrpc::JsonRpcRequest;
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::TypedHeader;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{rpc_key, user, user_tier};
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use redis_rate_limiter::redis::AsyncCommands;
@ -33,29 +33,28 @@ pub enum RpcSecretKey {
Uuid(Uuid),
}
/// TODO: should this have IpAddr and Origin or AuthorizationChecks?
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
/// TODO: option inside or outside the arc?
AllowedIp(IpAddr, Option<OwnedSemaphorePermit>),
/// contains the rpc_key_id of an authenticated user
AllowedUser(UserKeyData, Option<OwnedSemaphorePermit>),
/// contains the IP and retry_at of the anonymous user
RateLimitedIp(IpAddr, Option<Instant>),
/// contains the rpc_key_id and retry_at of an authenticated user key
RateLimitedUser(UserKeyData, Option<Instant>),
Allowed(Authorization, Option<OwnedSemaphorePermit>),
RateLimited(
Authorization,
/// when their rate limit resets and they can try more requests
Option<Instant>,
),
/// This key is not in our database. Deny access!
UnknownKey,
}
/// TODO: include the authorization checks in this?
#[derive(Clone, Debug)]
pub struct AuthorizedKey {
pub struct Authorization {
pub checks: AuthorizationChecks,
pub db_conn: Option<DatabaseConnection>,
pub ip: IpAddr,
pub origin: Option<Origin>,
pub user_id: u64,
pub rpc_key_id: u64,
// TODO: just use an f32? even an f16 is probably fine
pub log_revert_chance: f64,
pub referer: Option<Referer>,
pub user_agent: Option<UserAgent>,
}
#[derive(Debug)]
@ -65,6 +64,7 @@ pub struct RequestMetadata {
// TODO: better name for this
pub period_seconds: u64,
pub request_bytes: u64,
// TODO: do we need atomics? seems like we should be able to pass a &mut around
// TODO: "archive" isn't really a boolean.
pub archive_request: AtomicBool,
/// if this is 0, there was a cache_hit
@ -75,16 +75,6 @@ pub struct RequestMetadata {
pub response_millis: AtomicU64,
}
#[derive(Clone, Debug)]
pub enum AuthorizedRequest {
/// Request from this app
Internal,
/// Request from an anonymous IP address
Ip(IpAddr, Option<Origin>),
/// Request from an authenticated and authorized user
User(Option<DatabaseConnection>, AuthorizedKey),
}
impl RequestMetadata {
pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result<Self> {
// TODO: how can we do this without turning it into a string first. this is going to slow us down!
@ -176,16 +166,65 @@ impl From<RpcSecretKey> for Uuid {
}
}
impl AuthorizedKey {
pub fn try_new(
impl Authorization {
pub fn local(db_conn: Option<DatabaseConnection>) -> anyhow::Result<Self> {
let authorization_checks = AuthorizationChecks {
// any error logs on a local (internal) query are likely problems. log them all
log_revert_chance: 1.0,
// default for everything else should be fine. we don't have a user_id or ip to give
..Default::default()
};
let ip: IpAddr = "127.0.0.1".parse().expect("localhost should always parse");
let user_agent = UserAgent::from_str(APP_USER_AGENT).ok();
Self::try_new(authorization_checks, db_conn, ip, None, None, user_agent)
}
pub fn public(
allowed_origin_requests_per_period: &HashMap<String, u64>,
db_conn: Option<DatabaseConnection>,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> anyhow::Result<Self> {
// some origins can override max_requests_per_period for anon users
let max_requests_per_period = origin
.as_ref()
.map(|origin| {
allowed_origin_requests_per_period
.get(&origin.to_string())
.cloned()
})
.unwrap_or_default();
// TODO: default or None?
let authorization_checks = AuthorizationChecks {
max_requests_per_period,
..Default::default()
};
Self::try_new(
authorization_checks,
db_conn,
ip,
origin,
referer,
user_agent,
)
}
pub fn try_new(
authorization_checks: AuthorizationChecks,
db_conn: Option<DatabaseConnection>,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
rpc_key_data: UserKeyData,
) -> anyhow::Result<Self> {
// check ip
match &rpc_key_data.allowed_ips {
match &authorization_checks.allowed_ips {
None => {}
Some(allowed_ips) => {
if !allowed_ips.iter().any(|x| x.contains(&ip)) {
@ -195,7 +234,7 @@ impl AuthorizedKey {
}
// check origin
match (&origin, &rpc_key_data.allowed_origins) {
match (&origin, &authorization_checks.allowed_origins) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("Origin required")),
@ -207,97 +246,82 @@ impl AuthorizedKey {
}
// check referer
match (referer, &rpc_key_data.allowed_referers) {
match (&referer, &authorization_checks.allowed_referers) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("Referer required")),
(Some(referer), Some(allowed_referers)) => {
if !allowed_referers.contains(&referer) {
if !allowed_referers.contains(referer) {
return Err(anyhow::anyhow!("Referer is not allowed!"));
}
}
}
// check user_agent
match (user_agent, &rpc_key_data.allowed_user_agents) {
match (&user_agent, &authorization_checks.allowed_user_agents) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("User agent required")),
(Some(user_agent), Some(allowed_user_agents)) => {
if !allowed_user_agents.contains(&user_agent) {
if !allowed_user_agents.contains(user_agent) {
return Err(anyhow::anyhow!("User agent is not allowed!"));
}
}
}
Ok(Self {
checks: authorization_checks,
db_conn,
ip,
origin,
user_id: rpc_key_data.user_id,
rpc_key_id: rpc_key_data.rpc_key_id,
log_revert_chance: rpc_key_data.log_revert_chance,
referer,
user_agent,
})
}
}
impl AuthorizedRequest {
/// Only User has a database connection in case it needs to save a revert to the database.
pub fn db_conn(&self) -> Option<&DatabaseConnection> {
match self {
Self::User(x, _) => x.as_ref(),
_ => None,
}
}
}
impl Display for &AuthorizedRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthorizedRequest::Internal => f.write_str("int"),
AuthorizedRequest::Ip(x, _) => f.write_str(&format!("ip-{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk-{}", x.rpc_key_id)),
}
}
}
/// rate limit logins only by ip.
/// we want all origins and referers and user agents to count together
pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
let (ip, _semaphore) = match app.rate_limit_login(ip).await? {
RateLimitResult::AllowedIp(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
) -> Result<Authorization, FrontendErrorResponse> {
let authorization = match app.rate_limit_login(ip).await? {
RateLimitResult::Allowed(authorization, None) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
}
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
};
Ok(AuthorizedRequest::Ip(ip, None))
Ok(authorization)
}
/// semaphore won't ever be None, but its easier if key auth and ip auth work the same way
pub async fn ip_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
origin: Option<TypedHeader<Origin>>,
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
let origin = origin.map(|x| x.0);
origin: Option<Origin>,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (ip, semaphore) = match app.rate_limit_by_ip(ip, origin.as_ref()).await? {
RateLimitResult::AllowedIp(ip, semaphore) => (ip, semaphore),
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
let (authorization, semaphore) = match app
.rate_limit_by_ip(&app.config.allowed_origin_requests_per_period, ip, origin)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
}
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
};
// semaphore won't ever be None, but its easier if key auth and ip auth work the same way
Ok((AuthorizedRequest::Ip(ip, origin), semaphore))
Ok((authorization, semaphore))
}
/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse;
pub async fn key_is_authorized(
app: &Web3ProxyApp,
rpc_key: RpcSecretKey,
@ -305,23 +329,21 @@ pub async fn key_is_authorized(
origin: Option<Origin>,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(AuthorizedRequest, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// check the rate limits. error if over the limit
let (user_data, semaphore) = match app.rate_limit_by_key(rpc_key).await? {
RateLimitResult::AllowedUser(x, semaphore) => (x, semaphore),
RateLimitResult::RateLimitedUser(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedUser(x, retry_at));
// TODO: i think this should be in an "impl From" or "impl Into"
let (authorization, semaphore) = match app
.rate_limit_by_rpc_key(ip, origin, referer, rpc_key, user_agent)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
}
RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey),
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_by_key shouldn't ever see these: {:?}", x),
};
let authorized_user = AuthorizedKey::try_new(ip, origin, referer, user_agent, user_data)?;
let db_conn = app.db_conn.clone();
Ok((AuthorizedRequest::User(db_conn, authorized_user), semaphore))
Ok((authorization, semaphore))
}
impl Web3ProxyApp {
@ -353,16 +375,19 @@ impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given key address.
#[instrument(level = "trace")]
pub async fn user_rpc_key_semaphore(
pub async fn authorization_checks_semaphore(
&self,
rpc_key_data: &UserKeyData,
authorization_checks: &AuthorizationChecks,
) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = rpc_key_data.max_concurrent_requests {
if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests {
let semaphore = self
.rpc_key_semaphores
.get_with(rpc_key_data.rpc_key_id, async move {
.get_with(authorization_checks.rpc_key_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
trace!("new semaphore for rpc_key_id {}", rpc_key_data.rpc_key_id);
trace!(
"new semaphore for rpc_key_id {}",
authorization_checks.rpc_key_id
);
Arc::new(s)
})
.await;
@ -423,93 +448,121 @@ impl Web3ProxyApp {
#[instrument(level = "trace")]
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: do we want a semaphore here?
// TODO: dry this up with rate_limit_by_rpc_key?
// we don't care about user agent or origin or referer
let authorization = Authorization::public(
&self.config.allowed_origin_requests_per_period,
self.db_conn(),
ip,
None,
None,
None,
)?;
// no semaphore is needed here because login rate limits are low
// TODO: are we sure do we want a semaphore here?
let semaphore = None;
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip, None)),
Ok(RedisRateLimitResult::Allowed(_)) => {
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
trace!(?ip, "login rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(RedisRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
trace!(?ip, "login rate limit is 0");
Ok(RateLimitResult::RateLimitedIp(ip, None))
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip, None))
Ok(RateLimitResult::Allowed(authorization, None))
}
}
} else {
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::AllowedIp(ip, None))
Ok(RateLimitResult::Allowed(authorization, None))
}
}
/// origin is included because it can override the default rate limits
#[instrument(level = "trace")]
pub async fn rate_limit_by_ip(
&self,
allowed_origin_requests_per_period: &HashMap<String, u64>,
ip: IpAddr,
origin: Option<&Origin>,
origin: Option<Origin>,
) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
let semaphore = self.ip_semaphore(ip).await?;
// ip rate limits don't check referer or user agent
// the do check
let authorization = Authorization::public(
allowed_origin_requests_per_period,
self.db_conn.clone(),
ip,
origin,
None,
None,
)?;
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
let max_requests_per_period = origin
.map(|origin| {
self.config
.allowed_origin_requests_per_period
.get(&origin.to_string())
.cloned()
})
.unwrap_or_default();
match rate_limiter.throttle(ip, max_requests_per_period, 1).await {
match rate_limiter
.throttle(ip, authorization.checks.max_requests_per_period, 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
Ok(RateLimitResult::AllowedIp(ip, semaphore))
// rate limit allowed us. check concurrent request limits
let semaphore = self.ip_semaphore(ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
trace!(?ip, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
trace!(?ip, "rate limit is 0");
Ok(RateLimitResult::RateLimitedIp(ip, None))
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// this an internal error of some kind, not the rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip, semaphore))
// at least we can still check the semaphore
let semaphore = self.ip_semaphore(ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}
} else {
// no redis, but we can still check the ip semaphore
let semaphore = self.ip_semaphore(ip).await?;
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::AllowedIp(ip, semaphore))
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}
// check the local cache for user data, or query the database
#[instrument(level = "trace")]
pub(crate) async fn user_data(
pub(crate) async fn authorization_checks(
&self,
rpc_secret_key: RpcSecretKey,
) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self
) -> anyhow::Result<AuthorizationChecks> {
let authorization_checks: Result<_, Arc<anyhow::Error>> = self
.rpc_secret_key_cache
.try_get_with(rpc_secret_key.into(), async move {
trace!(?rpc_secret_key, "user cache miss");
@ -592,9 +645,7 @@ impl Web3ProxyApp {
None
};
// let user_tier_model = user_tier
Ok(UserKeyData {
Ok(AuthorizationChecks {
user_id: rpc_key_model.user_id,
rpc_key_id: rpc_key_model.id,
allowed_ips,
@ -606,31 +657,50 @@ impl Web3ProxyApp {
max_requests_per_period: user_tier_model.max_requests_per_period,
})
}
None => Ok(UserKeyData::default()),
None => Ok(AuthorizationChecks::default()),
}
})
.await;
// TODO: what's the best way to handle this arc? try_unwrap will not work
user_data.map_err(|err| anyhow::anyhow!(err))
authorization_checks.map_err(|err| anyhow::anyhow!(err))
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
#[instrument(level = "trace")]
pub async fn rate_limit_by_key(
pub async fn rate_limit_by_rpc_key(
&self,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
rpc_key: RpcSecretKey,
user_agent: Option<UserAgent>,
) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(rpc_key).await?;
let authorization_checks = self.authorization_checks(rpc_key).await?;
if user_data.rpc_key_id == 0 {
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
if authorization_checks.rpc_key_id == 0 {
return Ok(RateLimitResult::UnknownKey);
}
let semaphore = self.user_rpc_key_semaphore(&user_data).await?;
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self
.authorization_checks_semaphore(&authorization_checks)
.await?;
let user_max_requests_per_period = match user_data.max_requests_per_period {
let authorization = Authorization::try_new(
authorization_checks,
self.db_conn(),
ip,
origin,
referer,
user_agent,
)?;
let user_max_requests_per_period = match authorization.checks.max_requests_per_period {
None => {
return Ok(RateLimitResult::AllowedUser(user_data, semaphore));
return Ok(RateLimitResult::Allowed(authorization, semaphore));
}
Some(x) => x,
};
@ -642,7 +712,7 @@ impl Web3ProxyApp {
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
@ -651,25 +721,25 @@ impl Web3ProxyApp {
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedUser(user_data, Some(retry_at)))
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
Ok(RateLimitResult::RateLimitedUser(user_data, None))
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}
} else {
// TODO: if no redis, rate limit with just a local cache?
Ok(RateLimitResult::AllowedUser(user_data, semaphore))
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}
}

@ -1,6 +1,7 @@
//! Utlities for logging errors for admins and displaying errors to users.
use crate::{app::UserKeyData, jsonrpc::JsonRpcForwardedResponse};
use super::authorization::Authorization;
use crate::jsonrpc::JsonRpcForwardedResponse;
use axum::{
headers,
http::StatusCode,
@ -13,7 +14,7 @@ use ipnet::AddrParseError;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use sea_orm::DbErr;
use std::{error::Error, net::IpAddr};
use std::error::Error;
use tokio::{task::JoinError, time::Instant};
use tracing::{instrument, trace, warn};
@ -32,12 +33,11 @@ pub enum FrontendErrorResponse {
IpAddrParse(AddrParseError),
JoinError(JoinError),
NotFound,
RateLimitedUser(UserKeyData, Option<Instant>),
RateLimitedIp(IpAddr, Option<Instant>),
RateLimited(Authorization, Option<Instant>),
Redis(RedisError),
Response(Response),
/// simple way to return an error message to the user and an anyhow to our logs
StatusCode(StatusCode, String, anyhow::Error),
StatusCode(StatusCode, String, Option<anyhow::Error>),
UlidDecodeError(ulid::DecodeError),
UnknownKey,
}
@ -138,28 +138,32 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
Self::RateLimitedIp(ip, _retry_at) => {
// TODO: emit a stat
// TODO: include retry_at in the error
// TODO: if retry_at is None, give an unauthorized status code?
(
StatusCode::TOO_MANY_REQUESTS,
JsonRpcForwardedResponse::from_string(
format!("too many requests from ip {}!", ip),
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
None,
),
)
}
// TODO: this should actually by the id of the key. multiple users might control one key
Self::RateLimitedUser(user_data, _retry_at) => {
Self::RateLimited(authorization, retry_at) => {
// TODO: emit a stat
// TODO: include retry_at in the error
let retry_msg = if let Some(retry_at) = retry_at {
let retry_in = retry_at.duration_since(Instant::now()).as_secs();
format!(" Retry in {} seconds", retry_in)
} else {
"".to_string()
};
// create a string with either the IP or the rpc_key_id
let msg = if authorization.checks.rpc_key_id == 0 {
format!("too many requests from {}.{}", authorization.ip, retry_msg)
} else {
format!(
"too many requests from rpc key #{}.{}",
authorization.checks.rpc_key_id, retry_msg
)
};
(
StatusCode::TOO_MANY_REQUESTS,
JsonRpcForwardedResponse::from_string(
// TODO: better error
format!("too many requests from {:?}!", user_data),
msg,
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
None,
),

@ -25,17 +25,20 @@ pub async fn proxy_web3_rpc(
) -> FrontendResult {
let request_span = error_span!("request", %ip);
let (authorized_request, _semaphore) = ip_is_authorized(&app, ip, origin)
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin)
.instrument(request_span)
.await?;
let request_span = error_span!("request", ?authorized_request);
let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request);
let authorization = Arc::new(authorization);
// TODO: spawn earlier? i think we want ip_is_authorized in this future
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorized_request, payload)
app.proxy_web3_rpc(authorization, payload)
.instrument(request_span)
.await
});
@ -64,7 +67,8 @@ pub async fn proxy_web3_rpc_with_key(
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let (authorized_request, _semaphore) = key_is_authorized(
// keep the semaphore until the end of the response
let (authorization, _semaphore) = key_is_authorized(
&app,
rpc_key,
ip,
@ -75,14 +79,14 @@ pub async fn proxy_web3_rpc_with_key(
.instrument(request_span.clone())
.await?;
let request_span = error_span!("request", ?authorized_request);
let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request);
let authorization = Arc::new(authorization);
// the request can take a while, so we spawn so that we can start serving another request
// TODO: spawn even earlier?
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorized_request, payload)
app.proxy_web3_rpc(authorization, payload)
.instrument(request_span)
.await
});

@ -2,8 +2,8 @@
//!
//! WebSockets are the preferred method of receiving requests, but not all clients have good support.
use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest};
use super::errors::FrontendResult;
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization};
use super::errors::{FrontendErrorResponse, FrontendResult};
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
@ -20,6 +20,7 @@ use futures::{
};
use handlebars::Handlebars;
use hashbrown::HashMap;
use http::StatusCode;
use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
@ -43,18 +44,20 @@ pub async fn websocket_handler(
// TODO: i don't like logging ips. move this to trace level?
let request_span = error_span!("request", %ip, ?origin);
let (authorized_request, _semaphore) = ip_is_authorized(&app, ip, origin)
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin)
.instrument(request_span)
.await?;
let request_span = error_span!("request", ?authorized_request);
let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request);
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws) => Ok(ws
.on_upgrade(|socket| {
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
proxy_web3_socket(app, authorization, socket).instrument(request_span)
})
.into_response()),
None => {
@ -90,7 +93,7 @@ pub async fn websocket_handler_with_key(
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let (authorized_request, _semaphore) = key_is_authorized(
let (authorization, _semaphore) = key_is_authorized(
&app,
rpc_key,
ip,
@ -102,39 +105,52 @@ pub async fn websocket_handler_with_key(
.await?;
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
let request_span = error_span!("request", ?authorized_request);
let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request);
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
proxy_web3_socket(app, authorization, socket).instrument(request_span)
})),
None => {
// if no websocket upgrade, this is probably a user loading the url with their browser
if let Some(redirect) = &app.config.redirect_user_url {
// TODO: store this on the app and use register_template?
let reg = Handlebars::new();
// TODO: show the user's address, not their id (remember to update the checks for {{user_id}}} in app.rs)
// TODO: query to get the user's address. expose that instead of user_id
if let AuthorizedRequest::User(_, authorized_key) = authorized_request.as_ref() {
let user_url = reg
.render_template(
redirect,
&json!({ "rpc_key_id": authorized_key.rpc_key_id }),
)
.expect("templating should always work");
// this is not a websocket. redirect to a page for this user
Ok(Redirect::to(&user_url).into_response())
} else {
// TODO: i think this is impossible
Err(anyhow::anyhow!("this page is for rpcs").into())
match (
&app.config.redirect_public_url,
&app.config.redirect_rpc_key_url,
authorization.checks.rpc_key_id,
) {
(None, None, _) => Err(anyhow::anyhow!(
"redirect_rpc_key_url not set. only websockets work here"
)
.into()),
(Some(redirect_public_url), _, 0) => {
Ok(Redirect::to(redirect_public_url).into_response())
}
} else {
// TODO: do not use an anyhow error. send the user a 400
Err(anyhow::anyhow!("redirect_user_url not set. only websockets work here").into())
(_, Some(redirect_rpc_key_url), rpc_key_id) => {
let reg = Handlebars::new();
if authorization.checks.rpc_key_id == 0 {
// TODO: i think this is impossible
Err(anyhow::anyhow!("this page is for rpcs").into())
} else {
let redirect_rpc_key_url = reg
.render_template(
redirect_rpc_key_url,
&json!({ "rpc_key_id": rpc_key_id }),
)
.expect("templating should always work");
// this is not a websocket. redirect to a page for this user
Ok(Redirect::to(&redirect_rpc_key_url).into_response())
}
}
// any other combinations get a simple error
_ => Err(FrontendErrorResponse::StatusCode(
StatusCode::BAD_REQUEST,
"this page is for rpcs".to_string(),
None,
)),
}
}
}
@ -143,7 +159,7 @@ pub async fn websocket_handler_with_key(
#[instrument(level = "trace")]
async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>,
authorization: Arc<Authorization>,
socket: WebSocket,
) {
// split the websocket so we can read and write concurrently
@ -153,19 +169,14 @@ async fn proxy_web3_socket(
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(
app,
authorized_request,
ws_rx,
response_sender,
));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
}
/// websockets support a few more methods than http clients
#[instrument(level = "trace")]
async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
@ -173,19 +184,21 @@ async fn handle_socket_payload(
) -> Message {
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(payload) => {
Ok(json_request) => {
// TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic
let id = payload.id.clone();
let id = json_request.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &json_request.method
[..]
{
"eth_subscribe" => {
// TODO: what should go in this span?
let span = error_span!("eth_subscribe");
let response = app
.eth_subscribe(
authorized_request,
payload,
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
@ -213,7 +226,7 @@ async fn handle_socket_payload(
"eth_unsubscribe" => {
// TODO: how should handle rate limits and stats on this?
// TODO: handle invalid params
let subscription_id = payload.params.unwrap().to_string();
let subscription_id = json_request.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id) {
None => false,
@ -228,7 +241,10 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => app.proxy_web3_rpc(authorized_request, payload.into()).await,
_ => {
app.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
}
};
(id, response)
@ -256,7 +272,7 @@ async fn handle_socket_payload(
#[instrument(level = "trace")]
async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>,
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
) {
@ -269,7 +285,7 @@ async fn read_web3_socket(
Message::Text(payload) => {
handle_socket_payload(
app.clone(),
authorized_request.clone(),
&authorization,
&payload,
&response_sender,
&subscription_count,
@ -292,7 +308,7 @@ async fn read_web3_socket(
handle_socket_payload(
app.clone(),
authorized_request.clone(),
&authorization,
payload,
&response_sender,
&subscription_count,

@ -2,7 +2,7 @@
use super::connection::Web3Connection;
use super::connections::Web3Connections;
use super::transactions::TxStatus;
use crate::frontend::authorization::AuthorizedRequest;
use crate::frontend::authorization::Authorization;
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
};
@ -88,7 +88,7 @@ impl Web3Connections {
#[instrument(level = "trace")]
pub async fn block(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
hash: &H256,
rpc: Option<&Arc<Web3Connection>>,
) -> anyhow::Result<ArcBlock> {
@ -103,7 +103,7 @@ impl Web3Connections {
// TODO: if error, retry?
let block: Block<TxHash> = match rpc {
Some(rpc) => {
rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30))
rpc.wait_for_request_handle(authorization, Duration::from_secs(30))
.await?
.request(
"eth_getBlockByHash",
@ -118,9 +118,9 @@ impl Web3Connections {
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorized_request?
// TODO: request_metadata? maybe we should put it in the authorization?
let response = self
.try_send_best_upstream_server(authorized_request, request, None, None)
.try_send_best_upstream_server(authorization, request, None, None)
.await?;
let block = response.result.unwrap();
@ -139,8 +139,12 @@ impl Web3Connections {
}
/// Convenience method to get the cannonical block at a given block height.
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<(H256, bool)> {
let (block, is_archive_block) = self.cannonical_block(num).await?;
pub async fn block_hash(
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(H256, bool)> {
let (block, is_archive_block) = self.cannonical_block(authorization, num).await?;
let hash = block.hash.expect("Saved blocks should always have hashes");
@ -148,7 +152,11 @@ impl Web3Connections {
}
/// Get the heaviest chain's block from cache or backend rpc
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<(ArcBlock, bool)> {
pub async fn cannonical_block(
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(ArcBlock, bool)> {
// we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
@ -174,8 +182,8 @@ impl Web3Connections {
// deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorized_request through here?
let block = self.block(None, &block_hash, None).await?;
// TODO: pass authorization through here?
let block = self.block(authorization, &block_hash, None).await?;
return Ok((block, archive_needed));
}
@ -186,9 +194,9 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorized_request?
// TODO: request_metadata or authorization?
let response = self
.try_send_best_upstream_server(None, request, None, Some(num))
.try_send_best_upstream_server(authorization, request, None, Some(num))
.await?;
let raw_block = response.result.context("no block result")?;
@ -205,6 +213,7 @@ impl Web3Connections {
pub(super) async fn process_incoming_blocks(
&self,
authorization: &Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
@ -219,6 +228,7 @@ impl Web3Connections {
let rpc_name = rpc.name.clone();
if let Err(err) = self
.process_block_from_rpc(
authorization,
&mut connection_heads,
new_block,
rpc,
@ -242,6 +252,7 @@ impl Web3Connections {
/// TODO: return something?
async fn process_block_from_rpc(
&self,
authorization: &Arc<Authorization>,
connection_heads: &mut HashMap<String, H256>,
rpc_head_block: Option<ArcBlock>,
rpc: Arc<Web3Connection>,
@ -305,7 +316,10 @@ impl Web3Connections {
// this option should always be populated
let conn_rpc = self.conns.get(conn_name);
match self.block(None, connection_head_hash, conn_rpc).await {
match self
.block(authorization, connection_head_hash, conn_rpc)
.await
{
Ok(block) => block,
Err(err) => {
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes");

@ -4,7 +4,7 @@ use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use crate::frontend::authorization::AuthorizedRequest;
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
@ -12,6 +12,7 @@ use futures::StreamExt;
use parking_lot::RwLock;
use rand::Rng;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use sea_orm::DatabaseConnection;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -63,6 +64,7 @@ impl Web3Connection {
pub async fn spawn(
name: String,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
@ -111,12 +113,14 @@ impl Web3Connection {
.retrying_reconnect(block_sender.as_ref(), false)
.await?;
let authorization = Arc::new(Authorization::local(db_conn)?);
// check the server's chain_id here
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be?
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await?
.request(
"eth_chainId",
@ -149,9 +153,11 @@ impl Web3Connection {
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
let handle = {
let new_connection = new_connection.clone();
let authorization = authorization.clone();
tokio::spawn(async move {
new_connection
.subscribe(
&authorization,
http_interval_sender,
block_map,
block_sender,
@ -174,13 +180,19 @@ impl Web3Connection {
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(250)).await;
new_connection.check_block_data_limit().await?;
new_connection
.check_block_data_limit(&authorization)
.await?;
}
Ok((new_connection, handle))
}
async fn check_block_data_limit(self: &Arc<Self>) -> anyhow::Result<Option<u64>> {
/// TODO: should check_block_data_limit take authorization?
async fn check_block_data_limit(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<Option<u64>> {
let mut limit = None;
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
@ -206,7 +218,7 @@ impl Web3Connection {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let archive_result: Result<Bytes, _> = self
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(authorization, Duration::from_secs(30))
.await?
.request(
"eth_getCode",
@ -453,6 +465,7 @@ impl Web3Connection {
/// subscribe to blocks and transactions with automatic reconnects
async fn subscribe(
self: Arc<Self>,
authorization: &Arc<Authorization>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
@ -468,6 +481,7 @@ impl Web3Connection {
if let Some(block_sender) = &block_sender {
let f = self.clone().subscribe_new_heads(
authorization.clone(),
http_interval_receiver,
block_sender.clone(),
block_map.clone(),
@ -479,7 +493,7 @@ impl Web3Connection {
if let Some(tx_id_sender) = &tx_id_sender {
let f = self
.clone()
.subscribe_pending_transactions(tx_id_sender.clone());
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -540,6 +554,7 @@ impl Web3Connection {
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
async fn subscribe_new_heads(
self: Arc<Self>,
authorization: Arc<Authorization>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
@ -560,7 +575,7 @@ impl Web3Connection {
loop {
// TODO: what should the max_wait be?
match self
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await
{
Ok(active_request_handle) => {
@ -648,7 +663,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
@ -658,7 +673,7 @@ impl Web3Connection {
// all it does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await?
.request(
"eth_getBlockByNumber",
@ -715,6 +730,7 @@ impl Web3Connection {
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
info!(%self, "watching pending transactions");
@ -752,7 +768,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(None, Duration::from_secs(30))
.wait_for_request_handle(&authorization, Duration::from_secs(30))
.await;
let mut stream = provider.subscribe_pending_txs().await?;
@ -783,13 +799,13 @@ impl Web3Connection {
#[instrument]
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
max_wait: Duration,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait;
loop {
let x = self.try_request_handle(authorized_request).await;
let x = self.try_request_handle(authorization).await;
trace!(?x, "try_request_handle");
@ -819,7 +835,7 @@ impl Web3Connection {
#[instrument]
pub async fn try_request_handle(
self: &Arc<Self>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
@ -850,7 +866,7 @@ impl Web3Connection {
}
};
let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned());
let handle = OpenRequestHandle::new(authorization.clone(), self.clone());
Ok(OpenRequestResult::Handle(handle))
}

@ -7,7 +7,7 @@ use super::request::{
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
@ -21,6 +21,7 @@ use futures::StreamExt;
use hashbrown::HashMap;
use moka::future::{Cache, ConcurrentCacheExt};
use petgraph::graphmap::DiGraphMap;
use sea_orm::DatabaseConnection;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -61,6 +62,7 @@ impl Web3Connections {
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: u64,
db_conn: Option<DatabaseConnection>,
server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
@ -119,6 +121,7 @@ impl Web3Connections {
return None;
}
let db_conn = db_conn.clone();
let http_client = http_client.clone();
let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone();
@ -137,6 +140,7 @@ impl Web3Connections {
server_config
.spawn(
server_name,
db_conn,
redis_pool,
chain_id,
http_client,
@ -212,6 +216,8 @@ impl Web3Connections {
min_synced_rpcs,
});
let authorization = Arc::new(Authorization::local(db_conn.clone())?);
let handle = {
let connections = connections.clone();
@ -219,6 +225,7 @@ impl Web3Connections {
// TODO: try_join_all with the other handles here
connections
.subscribe(
authorization,
pending_tx_id_receiver,
block_receiver,
head_block_sender,
@ -240,6 +247,7 @@ impl Web3Connections {
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe(
self: Arc<Self>,
authorization: Arc<Authorization>,
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<ArcBlock>>,
@ -253,10 +261,12 @@ impl Web3Connections {
// forwards new transacitons to pending_tx_receipt_sender
if let Some(pending_tx_sender) = pending_tx_sender.clone() {
let clone = self.clone();
let authorization = authorization.clone();
let handle = task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
let f = clone.clone().process_incoming_tx_id(
authorization.clone(),
rpc,
pending_tx_id,
pending_tx_sender.clone(),
@ -274,11 +284,13 @@ impl Web3Connections {
if let Some(head_block_sender) = head_block_sender {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = task::Builder::default()
.name("process_incoming_blocks")
.spawn(async move {
connections
.process_incoming_blocks(
&authorization,
block_receiver,
head_block_sender,
pending_tx_sender,
@ -373,7 +385,7 @@ impl Web3Connections {
/// get the best available rpc server
pub async fn next_upstream_server(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
@ -432,7 +444,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_request_handle(authorized_request).await {
match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle));
@ -476,7 +488,7 @@ impl Web3Connections {
// TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
@ -491,7 +503,7 @@ impl Web3Connections {
}
// check rate limits and increment our connection counter
match connection.try_request_handle(authorized_request).await {
match connection.try_request_handle(authorization).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -517,7 +529,7 @@ impl Web3Connections {
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
@ -532,7 +544,7 @@ impl Web3Connections {
}
match self
.next_upstream_server(
authorized_request,
authorization,
request_metadata,
&skip_rpcs,
min_block_needed,
@ -655,16 +667,13 @@ impl Web3Connections {
#[instrument]
pub async fn try_send_all_upstream_servers(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,
block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self
.upstream_servers(authorized_request, block_needed)
.await
{
match self.upstream_servers(authorization, block_needed).await {
Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?

@ -1,6 +1,6 @@
use super::connection::Web3Connection;
use super::provider::Web3Provider;
use crate::frontend::authorization::AuthorizedRequest;
use crate::frontend::authorization::Authorization;
use crate::metered::{JsonRpcErrorCount, ProviderErrorCount};
use anyhow::Context;
use chrono::Utc;
@ -13,8 +13,8 @@ use metered::HitCount;
use metered::ResponseTime;
use metered::Throughput;
use rand::Rng;
use sea_orm::ActiveEnum;
use sea_orm::ActiveModelTrait;
use sea_orm::{ActiveEnum};
use serde_json::json;
use std::fmt;
use std::sync::atomic::{self, AtomicBool, Ordering};
@ -35,7 +35,7 @@ pub enum OpenRequestResult {
/// Make RPC requests through this handle and drop it when you are done.
#[derive(Debug)]
pub struct OpenRequestHandle {
authorized_request: Arc<AuthorizedRequest>,
authorization: Arc<Authorization>,
conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference?
metrics: Arc<OpenRequestHandleMetrics>,
@ -75,41 +75,43 @@ impl From<Level> for RequestErrorHandler {
}
}
impl AuthorizedRequest {
impl Authorization {
/// Save a RPC call that return "execution reverted" to the database.
async fn save_revert(
self: Arc<Self>,
method: Method,
params: EthCallFirstParams,
) -> anyhow::Result<()> {
if let Self::User(Some(db_conn), authorized_request) = &*self {
// TODO: should the database set the timestamp?
let timestamp = Utc::now();
let to: Vec<u8> = params
.to
.as_bytes()
.try_into()
.expect("address should always convert to a Vec<u8>");
let call_data = params.data.map(|x| format!("{}", x));
let db_conn = self.db_conn.as_ref().context("no database connection")?;
let rl = revert_log::ActiveModel {
rpc_key_id: sea_orm::Set(authorized_request.rpc_key_id),
method: sea_orm::Set(method),
to: sea_orm::Set(to),
call_data: sea_orm::Set(call_data),
timestamp: sea_orm::Set(timestamp),
..Default::default()
};
// TODO: should the database set the timestamp?
// we intentionally use "now" and not the time the request started
// why? because we aggregate stats and setting one in the past could cause confusion
let timestamp = Utc::now();
let to: Vec<u8> = params
.to
.as_bytes()
.try_into()
.expect("address should always convert to a Vec<u8>");
let call_data = params.data.map(|x| format!("{}", x));
let rl = rl
.save(db_conn)
.await
.context("Failed saving new revert log")?;
let rl = revert_log::ActiveModel {
rpc_key_id: sea_orm::Set(self.checks.rpc_key_id),
method: sea_orm::Set(method),
to: sea_orm::Set(to),
call_data: sea_orm::Set(call_data),
timestamp: sea_orm::Set(timestamp),
..Default::default()
};
// TODO: what log level?
// TODO: better format
trace!(?rl);
}
let rl = rl
.save(db_conn)
.await
.context("Failed saving new revert log")?;
// TODO: what log level?
// TODO: better format
trace!(?rl);
// TODO: return something useful
Ok(())
@ -118,10 +120,7 @@ impl AuthorizedRequest {
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub fn new(
conn: Arc<Web3Connection>,
authorized_request: Option<Arc<AuthorizedRequest>>,
) -> Self {
pub fn new(authorization: Arc<Authorization>, conn: Arc<Web3Connection>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
@ -136,11 +135,8 @@ impl OpenRequestHandle {
let metrics = conn.open_request_handle_metrics.clone();
let used = false.into();
let authorized_request =
authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal));
Self {
authorized_request,
authorization,
conn,
metrics,
used,
@ -176,7 +172,7 @@ impl OpenRequestHandle {
// TODO: use tracing spans
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
// the authorized_request field is already on a parent span
// the authorization field is already on a parent span
trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
@ -209,33 +205,25 @@ impl OpenRequestHandle {
if !["eth_call", "eth_estimateGas"].contains(&method) {
trace!(%method, "skipping save on revert");
RequestErrorHandler::DebugLevel
} else if self.authorized_request.db_conn().is_none() {
trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::DebugLevel
} else if let AuthorizedRequest::User(db_conn, y) = self.authorized_request.as_ref()
{
if db_conn.is_none() {
trace!(%method, "no database. skipping save on revert");
} else if self.authorization.db_conn.is_some() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
if log_revert_chance == 0.0 {
trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else if log_revert_chance == 1.0 {
trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler
} else if rand::thread_rng().gen_range(0.0f64..=1.0) < log_revert_chance {
trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else {
let log_revert_chance = y.log_revert_chance;
if log_revert_chance == 0.0 {
trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else if log_revert_chance == 1.0 {
trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler
} else if rand::thread_rng().gen_range(0.0f64..=1.0) > log_revert_chance {
trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else {
trace!("Saving on revert");
// TODO: is always logging at debug level fine?
error_handler
}
trace!("Saving on revert");
// TODO: is always logging at debug level fine?
error_handler
}
} else {
trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::DebugLevel
}
} else {
@ -298,10 +286,7 @@ impl OpenRequestHandle {
.unwrap();
// spawn saving to the database so we don't slow down the request
let f = self
.authorized_request
.clone()
.save_revert(method, params.0 .0);
let f = self.authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}

@ -1,3 +1,5 @@
use crate::frontend::authorization::Authorization;
///! Load balanced communication with a group of web3 providers
use super::connection::Web3Connection;
use super::connections::Web3Connections;
@ -18,6 +20,7 @@ pub enum TxStatus {
impl Web3Connections {
async fn query_transaction_status(
&self,
authorization: &Arc<Authorization>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
) -> Result<Option<TxStatus>, ProviderError> {
@ -25,7 +28,7 @@ impl Web3Connections {
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_request_handle(None).await {
let tx: Transaction = match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
@ -62,6 +65,7 @@ impl Web3Connections {
/// dedupe transaction and send them to any listening clients
pub(super) async fn process_incoming_tx_id(
self: Arc<Self>,
authorization: Arc<Authorization>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,
pending_tx_sender: broadcast::Sender<TxStatus>,
@ -84,7 +88,7 @@ impl Web3Connections {
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self
.query_transaction_status(rpc.clone(), pending_tx_id)
.query_transaction_status(&authorization, rpc.clone(), pending_tx_id)
.await
{
Ok(Some(tx_state)) => {

@ -159,7 +159,7 @@ pub fn get_query_window_seconds_from_params(
FrontendErrorResponse::StatusCode(
StatusCode::BAD_REQUEST,
"Unable to parse rpc_key_id".to_string(),
e.into(),
Some(e.into()),
)
})
},
@ -290,6 +290,7 @@ pub async fn query_user_stats<'a>(
let user_id = get_user_id_from_params(redis_conn, bearer, params).await?;
let (condition, q) = if user_id == 0 {
// 0 means everyone. don't filter on user
// TODO: 0 or None?
(condition, q)
} else {
let q = q.left_join(rpc_key::Entity);
@ -302,36 +303,33 @@ pub async fn query_user_stats<'a>(
};
// filter on rpc_key_id
// if rpc_key_id, all the requests without a key will be loaded
// TODO: move getting the param and checking the bearer token into a helper function
let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") {
let rpc_key_id = rpc_key_id.parse::<u64>().map_err(|e| {
FrontendErrorResponse::StatusCode(
StatusCode::BAD_REQUEST,
"Unable to parse rpc_key_id".to_string(),
e.into(),
Some(e.into()),
)
})?;
if rpc_key_id == 0 {
response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into()));
let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id));
let q = q.group_by(rpc_accounting::Column::RpcKeyId);
if user_id == 0 {
// no user id, we did not join above
let q = q.left_join(rpc_key::Entity);
(condition, q)
} else {
response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into()));
// user_id added a join on rpc_key already. only filter on user_id
let condition = condition.add(rpc_key::Column::UserId.eq(user_id));
let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id));
let q = q.group_by(rpc_accounting::Column::RpcKeyId);
if user_id == 0 {
// no user id, we did not join above
let q = q.left_join(rpc_key::Entity);
(condition, q)
} else {
// user_id added a join on rpc_key already. only filter on user_id
let condition = condition.add(rpc_key::Column::UserId.eq(user_id));
(condition, q)
}
(condition, q)
}
} else {
(condition, q)