ULID or UUID. Prefer ULID

This commit is contained in:
Bryan Stitt 2022-09-24 05:53:45 +00:00
parent b254cb7d26
commit 5df2469d53
14 changed files with 239 additions and 118 deletions

21
TODO.md

@ -166,6 +166,11 @@ These are roughly in order of completition
- for security, we want these limits low.
- [x] user login should return the bearer token and the user keys
- [x] use siwe messages and signatures for sign up and login
- [x] check for bearer token on /rpc
- [x] ip blocking logs a warn. we don't need that
- [x] Ulid instead of Uuid for user keys
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
- since users are actively using our service, we will need to support both
- [ ] active requests per second per api key
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
@ -174,13 +179,9 @@ These are roughly in order of completition
- [-] add configurable size limits to all the Caches
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
- [ ] Ulid instead of Uuid for user keys
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
- since users are actively using our service, we will need to support both
- [ ] Ulid instead of Uuid for database ids
- might have to use Uuid in sea-orm and then convert to Ulid on display
- [ ] option to rotate api key
- [ ] read the cookie key from a file. easy to re-use and no giant blob of hex in our app config
- [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized
## V1
@ -275,7 +276,7 @@ in another repo: event subscriber
## "Maybe some day" and other Miscellaneous Things
- [ ] tool to revoke bearer tokens that also clears redis
- [ ] tool to revoke bearer tokens that clears redis
- [ ] eth_getBlockByNumber and similar calls served from the block map
- will need all Block<TxHash> **and** Block<TransactionReceipt> in caches or fetched efficiently
- so maybe we don't want this. we can just use the general request cache for these. they will only require 1 request and it means requests won't get in the way as much on writes as new blocks arrive.
@ -371,7 +372,6 @@ in another repo: event subscriber
- [ ] https://gitlab.com/moka-labs/tiered-cache-example
- [ ] web3connection3.block(...) might wait forever. be sure to do it safely
- [ ] search for all "todo!"
- [ ] replace all `.context("no servers in sync")` with proper error type
- [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right
- [ ] i think checking the parents of the heaviest chain works most of the time, but not always
- maybe iterate connection heads by total weight? i still think we need to include parent hashes
@ -379,11 +379,14 @@ in another repo: event subscriber
- [ ] whats going on here? why is it rolling back? maybe total_difficulty was a LOT higher?
- 2022-09-05T19:21:39.763630Z WARN web3_proxy::rpcs::blockchain: chain rolled back 1/6/7 head=15479604 (0xf809…6a2c) rpc=infura_free
- i wish i had more logs. its possible that 15479605 came immediatly after
- [ ] ip blocking logs a warn. we don't need that. a stat at most
- [ ] keep it working without redis and a database
- [ ] web3 on rpc1 exited without errors. maybe promote some shutdown messages from debug to info?
- [ ] better handling for offline http servers
- if we get a connection refused, we should remove the server's block info so it is taken out of rotation
- [ ] web3_proxy_cli command should read database settings from config
- [ ] how should we handle reverting transactions? they won't confirm for a while after we send them
- [ ] allow configuration of the expiration time of bearer tokens
- [ ] allow configuration of the expiration time of bearer tokens. currently defaults to 4 weeks
- [ ] instead of putting everything under /rpc, have a site_prefix config?
- [ ] Ulid instead of Uuid for database ids
- might have to use Uuid in sea-orm and then convert to Ulid on display
- [ ] emit stat when an IP/key goes over rate limits

@ -46,7 +46,7 @@ use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{error, info, trace, warn};
use uuid::Uuid;
use ulid::Ulid;
// TODO: make this customizable?
static APP_USER_AGENT: &str = concat!(
@ -103,10 +103,10 @@ pub struct Web3ProxyApp {
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Uuid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
}
/// flatten a JoinError into an anyhow error
@ -356,7 +356,7 @@ impl Web3ProxyApp {
rpc_rrl.clone(),
None,
));
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Uuid>::new(
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Ulid>::new(
10_000, "key", rpc_rrl, None,
));
@ -435,7 +435,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
payload: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
@ -629,7 +629,7 @@ impl Web3ProxyApp {
/// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
authorization: &Arc<AuthorizedRequest>,
authorized_request: &Arc<AuthorizedRequest>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level
@ -644,14 +644,14 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(
max_time,
self.proxy_web3_rpc_request(authorization, request),
self.proxy_web3_rpc_request(authorized_request, request),
)
.await??,
),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(
max_time,
self.proxy_web3_rpc_requests(authorization, requests),
self.proxy_web3_rpc_requests(authorized_request, requests),
)
.await??,
),
@ -667,7 +667,7 @@ impl Web3ProxyApp {
/// TODO: make sure this isn't a problem
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorization: &Arc<AuthorizedRequest>,
authorized_request: &Arc<AuthorizedRequest>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
@ -675,7 +675,7 @@ impl Web3ProxyApp {
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_web3_rpc_request(authorization, request))
.map(|request| self.proxy_web3_rpc_request(authorized_request, request))
.collect::<Vec<_>>(),
)
.await;
@ -707,7 +707,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
async fn proxy_web3_rpc_request(
self: &Arc<Self>,
authorization: &Arc<AuthorizedRequest>,
authorized_request: &Arc<AuthorizedRequest>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
@ -841,7 +841,7 @@ impl Web3ProxyApp {
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
.try_send_all_upstream_servers(Some(authorization), request, None)
.try_send_all_upstream_servers(Some(authorized_request), request, None)
.await;
}
"eth_syncing" => {
@ -942,7 +942,7 @@ impl Web3ProxyApp {
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
Some(authorization),
Some(authorized_request),
request,
Some(&request_block_id.num),
)

@ -4,29 +4,30 @@ use entities::{user, user_keys};
use ethers::prelude::Address;
use sea_orm::{ActiveModelTrait, TransactionTrait};
use tracing::info;
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::users::new_api_key;
use web3_proxy::frontend::authorization::UserKey;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Create a new user and api key
#[argh(subcommand, name = "create_user")]
pub struct CreateUserSubCommand {
#[argh(option)]
/// the user's ethereum address
/// the user's ethereum address.
address: Address,
#[argh(option)]
/// the user's optional email
/// the user's optional email.
email: Option<String>,
#[argh(option, default = "new_api_key()")]
/// the user's first api key.
/// If none given, one will be generated randomly.
api_key: Uuid,
#[argh(option, default = "UserKey::new()")]
/// the user's first api ULID or UUID key.
/// If none given, one will be created.
api_key: UserKey,
#[argh(option)]
/// maximum requests per minute
/// default to "None" which the code sees as "unlimited" requests
/// maximum requests per minute.
/// default to "None" which the code sees as "unlimited" requests.
rpm: Option<u64>,
}
@ -39,6 +40,7 @@ impl CreateUserSubCommand {
// TODO: take a simple String. If it starts with 0x, parse as address. otherwise convert ascii to hex
let address = self.address.to_fixed_bytes().into();
// TODO: get existing or create a new one
let u = user::ActiveModel {
address: sea_orm::Set(address),
email: sea_orm::Set(self.email),
@ -57,7 +59,7 @@ impl CreateUserSubCommand {
// TODO: requests_per_minute should be configurable
let uk = user_keys::ActiveModel {
user_id: u.id,
api_key: sea_orm::Set(self.api_key),
api_key: sea_orm::Set(self.api_key.into()),
requests_per_minute: sea_orm::Set(self.rpm),
..Default::default()
};
@ -67,7 +69,8 @@ impl CreateUserSubCommand {
txn.commit().await?;
info!("user key: {}", uk.api_key.as_ref());
info!("user key as ULID: {}", Ulid::from(self.api_key));
info!("user key as UUID: {}", Uuid::from(self.api_key));
Ok(())
}

@ -9,11 +9,85 @@ use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
use std::{net::IpAddr, sync::Arc};
use std::fmt::Display;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::time::Instant;
use tracing::{error, trace};
use ulid::Ulid;
use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub enum UserKey {
Ulid(Ulid),
Uuid(Uuid),
}
impl UserKey {
pub fn new() -> Self {
Ulid::new().into()
}
}
impl Display for UserKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: do this without dereferencing
let ulid: Ulid = (*self).into();
ulid.fmt(f)
}
}
impl Default for UserKey {
fn default() -> Self {
Self::new()
}
}
impl FromStr for UserKey {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(ulid) = s.parse::<Ulid>() {
Ok(ulid.into())
} else if let Ok(uuid) = s.parse::<Uuid>() {
Ok(uuid.into())
} else {
Err(anyhow::anyhow!("UserKey was not a ULID or UUID"))
}
}
}
impl From<Ulid> for UserKey {
fn from(x: Ulid) -> Self {
UserKey::Ulid(x)
}
}
impl From<Uuid> for UserKey {
fn from(x: Uuid) -> Self {
UserKey::Uuid(x)
}
}
impl From<UserKey> for Ulid {
fn from(x: UserKey) -> Self {
match x {
UserKey::Ulid(x) => x,
UserKey::Uuid(x) => Ulid::from(x.as_u128()),
}
}
}
impl From<UserKey> for Uuid {
fn from(x: UserKey) -> Self {
match x {
UserKey::Ulid(x) => Uuid::from_u128(x.0),
UserKey::Uuid(x) => x,
}
}
}
#[derive(Debug)]
pub enum RateLimitResult {
/// contains the IP of the anonymous user
@ -172,7 +246,15 @@ pub async fn bearer_is_authorized(
.context("fetching user key by id")?
.context("unknown user id")?;
key_is_authorized(app, user_key_data.api_key, ip, origin, referer, user_agent).await
key_is_authorized(
app,
user_key_data.api_key.into(),
ip,
origin,
referer,
user_agent,
)
.await
}
pub async fn ip_is_authorized(
@ -197,7 +279,7 @@ pub async fn ip_is_authorized(
pub async fn key_is_authorized(
app: &Web3ProxyApp,
user_key: Uuid,
user_key: UserKey,
ip: IpAddr,
origin: Option<Origin>,
referer: Option<Referer>,
@ -287,17 +369,19 @@ impl Web3ProxyApp {
}
// check the local cache for user data, or query the database
pub(crate) async fn user_data(&self, user_key: Uuid) -> anyhow::Result<UserKeyData> {
pub(crate) async fn user_data(&self, user_key: UserKey) -> anyhow::Result<UserKeyData> {
let user_data: Result<_, Arc<anyhow::Error>> = self
.user_cache
.try_get_with(user_key, async move {
.try_get_with(user_key.into(), async move {
trace!(?user_key, "user_cache miss");
let db = self.db_conn().context("Getting database connection")?;
let user_uuid: Uuid = user_key.into();
// TODO: join the user table to this to return the User? we don't always need it
match user_keys::Entity::find()
.filter(user_keys::Column::ApiKey.eq(user_key))
.filter(user_keys::Column::ApiKey.eq(user_uuid))
.filter(user_keys::Column::Active.eq(true))
.one(db)
.await?
@ -368,7 +452,7 @@ impl Web3ProxyApp {
user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference"))
}
pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> {
pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result<RateLimitResult> {
let user_data = self.user_data(user_key).await?;
if user_data.user_key_id == 0 {
@ -383,7 +467,7 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits
if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
match rate_limiter
.throttle(user_key, Some(user_max_requests_per_period), 1)
.throttle(user_key.into(), Some(user_max_requests_per_period), 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data)),

@ -47,12 +47,15 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
let app = Router::new()
// routes should be order most to least common
.route("/rpc", post(rpc_proxy_http::proxy_web3_rpc))
.route("/rpc", get(rpc_proxy_ws::public_websocket_handler))
.route("/rpc", get(rpc_proxy_ws::websocket_handler))
.route(
"/rpc/:user_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
.route("/rpc/:user_key", get(rpc_proxy_ws::user_websocket_handler))
.route(
"/rpc/:user_key",
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route("/rpc/health", get(http::health))
.route("/rpc/status", get(http::status))
// TODO: make this optional or remove it since it is available on another port

@ -9,7 +9,6 @@ use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use std::sync::Arc;
use tracing::{error_span, Instrument};
use uuid::Uuid;
pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
@ -22,7 +21,7 @@ pub async fn proxy_web3_rpc(
) -> FrontendResult {
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorization = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let origin = origin.map(|x| x.0);
let referer = referer.map(|x| x.0);
let user_agent = user_agent.map(|x| x.0);
@ -36,12 +35,12 @@ pub async fn proxy_web3_rpc(
.await?
};
let request_span = error_span!("request", ?authorization);
let request_span = error_span!("request", ?authorized_request);
let authorization = Arc::new(authorization);
let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorization, payload)
app.proxy_web3_rpc(&authorized_request, payload)
.instrument(request_span)
.await
});
@ -58,12 +57,14 @@ pub async fn proxy_web3_rpc_with_key(
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(user_key): Path<Uuid>,
Path(user_key): Path<String>,
) -> FrontendResult {
let user_key = user_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
// TODO: this should probably return the user_key_id instead? or maybe both?
let authorization = key_is_authorized(
let authorized_request = key_is_authorized(
&app,
user_key,
ip,
@ -74,12 +75,12 @@ pub async fn proxy_web3_rpc_with_key(
.instrument(request_span.clone())
.await?;
let request_span = error_span!("request", ?authorization);
let request_span = error_span!("request", ?authorized_request);
let authorization = Arc::new(authorization);
let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorization, payload)
app.proxy_web3_rpc(&authorized_request, payload)
.instrument(request_span)
.await
});

@ -1,6 +1,8 @@
use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest};
use super::authorization::{
bearer_is_authorized, ip_is_authorized, key_is_authorized, AuthorizedRequest,
};
use super::errors::FrontendResult;
use axum::headers::{Origin, Referer, UserAgent};
use axum::headers::{authorization::Bearer, Authorization, Origin, Referer, UserAgent};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::Path,
@ -20,7 +22,6 @@ use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, error_span, info, trace, Instrument};
use uuid::Uuid;
use crate::{
app::Web3ProxyApp,
@ -28,21 +29,39 @@ use crate::{
};
#[debug_handler]
pub async fn public_websocket_handler(
pub async fn websocket_handler(
bearer: Option<TypedHeader<Authorization<Bearer>>>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
let authorization = ip_is_authorized(&app, ip).await?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let request_span = error_span!("request", ?authorization);
let authorized_request = if let Some(TypedHeader(Authorization(bearer))) = bearer {
let origin = origin.map(|x| x.0);
let referer = referer.map(|x| x.0);
let user_agent = user_agent.map(|x| x.0);
let authorization = Arc::new(authorization);
bearer_is_authorized(&app, bearer, ip, origin, referer, user_agent)
.instrument(request_span.clone())
.await?
} else {
ip_is_authorized(&app, ip)
.instrument(request_span.clone())
.await?
};
let request_span = error_span!("request", ?authorized_request);
let authorized_request = Arc::new(authorized_request);
match ws_upgrade {
Some(ws) => Ok(ws
.on_upgrade(|socket| {
proxy_web3_socket(app, authorization, socket).instrument(request_span)
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
})
.into_response()),
None => {
@ -53,16 +72,20 @@ pub async fn public_websocket_handler(
}
#[debug_handler]
pub async fn user_websocket_handler(
pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
Path(user_key): Path<Uuid>,
Path(user_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
let authorization = key_is_authorized(
let user_key = user_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorized_request = key_is_authorized(
&app,
user_key,
ip,
@ -70,16 +93,17 @@ pub async fn user_websocket_handler(
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.instrument(request_span.clone())
.await?;
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
let request_span = error_span!("request", ?authorization);
let request_span = error_span!("request", ?authorized_request);
let authorization = Arc::new(authorization);
let authorized_request = Arc::new(authorized_request);
match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
proxy_web3_socket(app, authorization, socket).instrument(request_span)
proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
})),
None => {
// TODO: store this on the app and use register_template?
@ -90,7 +114,7 @@ pub async fn user_websocket_handler(
let user_url = reg
.render_template(
&app.config.redirect_user_url,
&json!({ "authorization": authorization }),
&json!({ "authorized_request": authorized_request }),
)
.unwrap();
@ -102,7 +126,7 @@ pub async fn user_websocket_handler(
async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
socket: WebSocket,
) {
// split the websocket so we can read and write concurrently
@ -112,13 +136,18 @@ async fn proxy_web3_socket(
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
tokio::spawn(read_web3_socket(
app,
authorized_request,
ws_rx,
response_sender,
));
}
/// websockets support a few more methods than http clients
async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
payload: &str,
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
@ -137,7 +166,7 @@ async fn handle_socket_payload(
let response = app
.eth_subscribe(
authorization.clone(),
authorized_request.clone(),
payload,
subscription_count,
response_sender.clone(),
@ -174,7 +203,10 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => app.proxy_web3_rpc(&authorization, payload.into()).await,
_ => {
app.proxy_web3_rpc(&authorized_request, payload.into())
.await
}
};
(id, response)
@ -200,7 +232,7 @@ async fn handle_socket_payload(
async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
) {
@ -213,7 +245,7 @@ async fn read_web3_socket(
Message::Text(payload) => {
handle_socket_payload(
app.clone(),
authorization.clone(),
authorized_request.clone(),
&payload,
&response_sender,
&subscription_count,
@ -236,7 +268,7 @@ async fn read_web3_socket(
handle_socket_payload(
app.clone(),
authorization.clone(),
authorized_request.clone(),
payload,
&response_sender,
&subscription_count,

@ -7,9 +7,9 @@
// I wonder how we handle payment
// probably have to do manual withdrawals
use super::authorization::login_is_authorized;
use super::authorization::{login_is_authorized, UserKey};
use super::errors::FrontendResult;
use crate::{app::Web3ProxyApp, users::new_api_key};
use crate::app::Web3ProxyApp;
use anyhow::Context;
use axum::{
extract::{Path, Query},
@ -31,7 +31,6 @@ use std::ops::Add;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use ulid::Ulid;
use uuid::Uuid;
// TODO: how do we customize axum's error response? I think we probably want an enum that implements IntoResponse instead
#[debug_handler]
@ -134,7 +133,7 @@ pub struct PostLogin {
#[derive(Serialize)]
pub struct PostLoginResponse {
bearer_token: Ulid,
api_keys: Vec<Uuid>,
api_keys: Vec<UserKey>,
}
/// Post to the user endpoint to register or login.
@ -197,9 +196,11 @@ pub async fn post_login(
let u = u.insert(&txn).await?;
let user_key = UserKey::new();
let uk = user_keys::ActiveModel {
user_id: sea_orm::Set(u.id),
api_key: sea_orm::Set(new_api_key()),
api_key: sea_orm::Set(user_key.into()),
requests_per_minute: sea_orm::Set(app.config.default_requests_per_minute),
..Default::default()
};
@ -216,7 +217,7 @@ pub async fn post_login(
let response_json = PostLoginResponse {
bearer_token,
api_keys: uks.iter().map(|uk| uk.api_key).collect(),
api_keys: uks.iter().map(|uk| uk.api_key.into()).collect(),
};
let response = (StatusCode::CREATED, Json(response_json)).into_response();
@ -233,7 +234,7 @@ pub async fn post_login(
let response_json = PostLoginResponse {
bearer_token,
api_keys: uks.iter().map(|uk| uk.api_key).collect(),
api_keys: uks.iter().map(|uk| uk.api_key.into()).collect(),
};
let response = (StatusCode::OK, Json(response_json)).into_response();

@ -6,4 +6,3 @@ pub mod jsonrpc;
pub mod metered;
pub mod metrics_frontend;
pub mod rpcs;
pub mod users;

@ -91,7 +91,7 @@ impl Web3Connections {
/// Will query a specific node or the best available.
pub async fn block(
&self,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
hash: &H256,
rpc: Option<&Arc<Web3Connection>>,
) -> anyhow::Result<ArcBlock> {
@ -106,7 +106,7 @@ impl Web3Connections {
// TODO: if error, retry?
let block: Block<TxHash> = match rpc {
Some(rpc) => {
rpc.wait_for_request_handle(authorization, Duration::from_secs(30))
rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30))
.await?
.request(
"eth_getBlockByHash",
@ -122,7 +122,7 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?;
let response = self
.try_send_best_upstream_server(authorization, request, None)
.try_send_best_upstream_server(authorized_request, request, None)
.await?;
let block = response.result.unwrap();
@ -171,7 +171,7 @@ impl Web3Connections {
// deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorization through here?
// TODO: pass authorized_request through here?
return self.block(None, &block_hash, None).await;
}

@ -742,13 +742,13 @@ impl Web3Connection {
#[instrument]
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
max_wait: Duration,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait;
loop {
let x = self.try_request_handle(authorization).await;
let x = self.try_request_handle(authorized_request).await;
trace!(?x, "try_request_handle");
@ -778,7 +778,7 @@ impl Web3Connection {
#[instrument]
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
@ -809,7 +809,7 @@ impl Web3Connection {
}
};
let handle = OpenRequestHandle::new(self.clone(), authorization.cloned());
let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned());
Ok(OpenRequestResult::Handle(handle))
}

@ -368,7 +368,7 @@ impl Web3Connections {
/// get the best available rpc server
pub async fn next_upstream_server(
&self,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
@ -427,7 +427,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_request_handle(authorization).await {
match rpc.try_request_handle(authorized_request).await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle));
@ -458,7 +458,7 @@ impl Web3Connections {
// TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers(
&self,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
block_needed: Option<&U64>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
@ -473,7 +473,7 @@ impl Web3Connections {
}
// check rate limits and increment our connection counter
match connection.try_request_handle(authorization).await {
match connection.try_request_handle(authorized_request).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -499,7 +499,7 @@ impl Web3Connections {
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server(
&self,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest,
min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
@ -511,7 +511,7 @@ impl Web3Connections {
break;
}
match self
.next_upstream_server(authorization, &skip_rpcs, min_block_needed)
.next_upstream_server(authorized_request, &skip_rpcs, min_block_needed)
.await?
{
OpenRequestResult::Handle(active_request_handle) => {
@ -601,12 +601,15 @@ impl Web3Connections {
#[instrument]
pub async fn try_send_all_upstream_servers(
&self,
authorization: Option<&Arc<AuthorizedRequest>>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest,
block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self.upstream_servers(authorization, block_needed).await {
match self
.upstream_servers(authorized_request, block_needed)
.await
{
Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?

@ -33,7 +33,7 @@ pub enum OpenRequestResult {
/// Make RPC requests through this handle and drop it when you are done.
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<AuthorizedRequest>,
authorized_request: Arc<AuthorizedRequest>,
conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference?
metrics: Arc<OpenRequestHandleMetrics>,
@ -104,7 +104,10 @@ impl AuthorizedRequest {
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle {
pub fn new(conn: Arc<Web3Connection>, authorization: Option<Arc<AuthorizedRequest>>) -> Self {
pub fn new(
conn: Arc<Web3Connection>,
authorized_request: Option<Arc<AuthorizedRequest>>,
) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
@ -119,13 +122,13 @@ impl OpenRequestHandle {
let metrics = conn.open_request_handle_metrics.clone();
let used = false.into();
let authorization = authorization.unwrap_or_else(|| {
let authorized_request = authorized_request.unwrap_or_else(|| {
let db_conn = conn.db_conn.clone();
Arc::new(AuthorizedRequest::Internal(db_conn))
});
Self {
authorization,
authorized_request,
conn,
metrics,
used,
@ -161,7 +164,7 @@ impl OpenRequestHandle {
// TODO: use tracing spans
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
// the authorization field is already on a parent span
// the authorized_request field is already on a parent span
trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
@ -193,7 +196,7 @@ impl OpenRequestHandle {
let error_handler = if let RequestErrorHandler::SaveReverts(save_chance) = error_handler
{
if ["eth_call", "eth_estimateGas"].contains(&method)
&& self.authorization.db_conn().is_some()
&& self.authorized_request.db_conn().is_some()
&& save_chance != 0.0
&& (save_chance == 1.0
|| rand::thread_rng().gen_range(0.0..=1.0) <= save_chance)
@ -249,7 +252,7 @@ impl OpenRequestHandle {
.expect("parsing eth_call");
// spawn saving to the database so we don't slow down the request (or error if no db)
let f = self.authorization.clone().save_revert(params);
let f = self.authorized_request.clone().save_revert(params);
tokio::spawn(async move { f.await });
} else {

@ -1,11 +0,0 @@
use rand::prelude::*;
use uuid::{Builder, Uuid};
pub fn new_api_key() -> Uuid {
// TODO: chacha20?
let mut rng = thread_rng();
let random_bytes = rng.gen();
Builder::from_random_bytes(random_bytes).into_uuid()
}