use uuid earlier
This commit is contained in:
parent
b90f80f46b
commit
ba981d7948
1
TODO.md
1
TODO.md
@ -200,3 +200,4 @@ in another repo: event subscriber
|
|||||||
- [ ] threshold should check actual available request limits (if any) instead of just the soft limit
|
- [ ] threshold should check actual available request limits (if any) instead of just the soft limit
|
||||||
- [ ] foreign key on_update and on_delete
|
- [ ] foreign key on_update and on_delete
|
||||||
- [ ] database creation timestamps
|
- [ ] database creation timestamps
|
||||||
|
- [ ] better error handling. we warn too often for validation errors and use the same error code for most every request
|
@ -8,7 +8,7 @@ use web3_proxy::app::get_migrated_db;
|
|||||||
#[derive(Debug, FromArgs)]
|
#[derive(Debug, FromArgs)]
|
||||||
/// Command line interface for admins to interact with web3-proxy
|
/// Command line interface for admins to interact with web3-proxy
|
||||||
pub struct TopConfig {
|
pub struct TopConfig {
|
||||||
/// what host the client should connect to
|
/// what database the client should connect to
|
||||||
#[argh(
|
#[argh(
|
||||||
option,
|
option,
|
||||||
default = "\"mysql://root:dev_web3_proxy@127.0.0.1:3306/dev_web3_proxy\".to_string()"
|
default = "\"mysql://root:dev_web3_proxy@127.0.0.1:3306/dev_web3_proxy\".to_string()"
|
||||||
|
@ -819,7 +819,7 @@ impl Web3Connections {
|
|||||||
if new_head_block {
|
if new_head_block {
|
||||||
self.chain.add_block(new_block.clone(), true);
|
self.chain.add_block(new_block.clone(), true);
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"{}/{} rpcs at {} ({}). head at {:?}",
|
"{}/{} rpcs at {} ({}). head at {:?}",
|
||||||
pending_synced_connections.conns.len(),
|
pending_synced_connections.conns.len(),
|
||||||
self.conns.len(),
|
self.conns.len(),
|
||||||
@ -847,7 +847,7 @@ impl Web3Connections {
|
|||||||
// TODO: mark any orphaned transactions as unconfirmed
|
// TODO: mark any orphaned transactions as unconfirmed
|
||||||
}
|
}
|
||||||
} else if num_best_rpcs == self.conns.len() {
|
} else if num_best_rpcs == self.conns.len() {
|
||||||
debug!(
|
trace!(
|
||||||
"all {} rpcs at {} ({})",
|
"all {} rpcs at {} ({})",
|
||||||
num_best_rpcs,
|
num_best_rpcs,
|
||||||
pending_synced_connections.head_block_hash,
|
pending_synced_connections.head_block_hash,
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
|
use axum::extract::Path;
|
||||||
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
|
||||||
use axum_client_ip::ClientIp;
|
use axum_client_ip::ClientIp;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::errors::handle_anyhow_error;
|
use super::errors::handle_anyhow_error;
|
||||||
use super::{rate_limit_by_ip, rate_limit_by_key};
|
use super::{rate_limit_by_ip, rate_limit_by_key};
|
||||||
@ -24,9 +26,9 @@ pub async fn public_proxy_web3_rpc(
|
|||||||
pub async fn user_proxy_web3_rpc(
|
pub async fn user_proxy_web3_rpc(
|
||||||
Json(payload): Json<JsonRpcRequestEnum>,
|
Json(payload): Json<JsonRpcRequestEnum>,
|
||||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||||
key: String,
|
Path(user_key): Path<Uuid>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
if let Err(x) = rate_limit_by_key(&app, &key).await {
|
if let Err(x) = rate_limit_by_key(&app, user_key).await {
|
||||||
return x.into_response();
|
return x.into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
|
|||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::app::Web3ProxyApp;
|
use crate::app::Web3ProxyApp;
|
||||||
|
|
||||||
@ -25,56 +26,11 @@ use self::errors::handle_anyhow_error;
|
|||||||
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> {
|
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> {
|
||||||
let rate_limiter_key = format!("ip:{}", ip);
|
let rate_limiter_key = format!("ip:{}", ip);
|
||||||
|
|
||||||
rate_limit_by_key(app, &rate_limiter_key).await
|
// TODO: dry this up with rate_limit_by_key
|
||||||
}
|
|
||||||
|
|
||||||
/// if Ok(()), rate limits are acceptable
|
|
||||||
/// if Err(response), rate limits exceeded
|
|
||||||
pub async fn rate_limit_by_key(
|
|
||||||
app: &Web3ProxyApp,
|
|
||||||
user_key: &str,
|
|
||||||
) -> Result<(), impl IntoResponse> {
|
|
||||||
let db = app.db_conn();
|
|
||||||
|
|
||||||
// query the db to make sure this key is active
|
|
||||||
// TODO: probably want a cache on this
|
|
||||||
match user_keys::Entity::find()
|
|
||||||
.select_only()
|
|
||||||
.column(user_keys::Column::UserId)
|
|
||||||
.filter(user_keys::Column::ApiKey.eq(user_key))
|
|
||||||
.filter(user_keys::Column::Active.eq(true))
|
|
||||||
.one(db)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(Some(_)) => {
|
|
||||||
// user key is valid
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
// invalid user key
|
|
||||||
// TODO: rate limit by ip here, too? maybe tarpit?
|
|
||||||
return Err(handle_anyhow_error(
|
|
||||||
Some(StatusCode::FORBIDDEN),
|
|
||||||
None,
|
|
||||||
anyhow::anyhow!("unknown api key"),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.into_response());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
return Err(handle_anyhow_error(
|
|
||||||
Some(StatusCode::INTERNAL_SERVER_ERROR),
|
|
||||||
None,
|
|
||||||
e.into(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.into_response());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(rate_limiter) = app.rate_limiter() {
|
if let Some(rate_limiter) = app.rate_limiter() {
|
||||||
if rate_limiter.throttle_key(user_key).await.is_err() {
|
if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() {
|
||||||
// TODO: set headers so they know when they can retry
|
// TODO: set headers so they know when they can retry
|
||||||
// warn!(?ip, "public rate limit exceeded");
|
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||||
// TODO: use their id if possible
|
// TODO: use their id if possible
|
||||||
return Err(handle_anyhow_error(
|
return Err(handle_anyhow_error(
|
||||||
Some(StatusCode::TOO_MANY_REQUESTS),
|
Some(StatusCode::TOO_MANY_REQUESTS),
|
||||||
@ -91,14 +47,82 @@ pub async fn rate_limit_by_key(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// if Ok(()), rate limits are acceptable
|
||||||
|
/// if Err(response), rate limits exceeded
|
||||||
|
pub async fn rate_limit_by_key(
|
||||||
|
app: &Web3ProxyApp,
|
||||||
|
user_key: Uuid,
|
||||||
|
) -> Result<(), impl IntoResponse> {
|
||||||
|
let db = app.db_conn();
|
||||||
|
|
||||||
|
// query the db to make sure this key is active
|
||||||
|
// TODO: probably want a cache on this
|
||||||
|
match user_keys::Entity::find()
|
||||||
|
// .select_only()
|
||||||
|
// .column(user_keys::Column::UserId)
|
||||||
|
.filter(user_keys::Column::ApiKey.eq(user_key))
|
||||||
|
.filter(user_keys::Column::Active.eq(true))
|
||||||
|
.one(db)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Some(_)) => {
|
||||||
|
// user key is valid
|
||||||
|
if let Some(rate_limiter) = app.rate_limiter() {
|
||||||
|
if rate_limiter
|
||||||
|
.throttle_key(&user_key.to_string())
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
// TODO: set headers so they know when they can retry
|
||||||
|
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||||
|
// TODO: use their id if possible
|
||||||
|
return Err(handle_anyhow_error(
|
||||||
|
Some(StatusCode::TOO_MANY_REQUESTS),
|
||||||
|
None,
|
||||||
|
anyhow::anyhow!("too many requests"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.into_response());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO: if no redis, rate limit with a local cache?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
// invalid user key
|
||||||
|
// TODO: rate limit by ip here, too? maybe tarpit?
|
||||||
|
return Err(handle_anyhow_error(
|
||||||
|
Some(StatusCode::FORBIDDEN),
|
||||||
|
None,
|
||||||
|
anyhow::anyhow!("unknown api key"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.into_response());
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let err: anyhow::Error = err.into();
|
||||||
|
|
||||||
|
return Err(handle_anyhow_error(
|
||||||
|
Some(StatusCode::INTERNAL_SERVER_ERROR),
|
||||||
|
None,
|
||||||
|
err.context("failed checking database for user key"),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.into_response());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
|
pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
|
||||||
// build our application with a route
|
// build our application with a route
|
||||||
// order most to least common
|
// order most to least common
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/", post(http_proxy::public_proxy_web3_rpc))
|
.route("/", post(http_proxy::public_proxy_web3_rpc))
|
||||||
.route("/", get(ws_proxy::public_websocket_handler))
|
.route("/", get(ws_proxy::public_websocket_handler))
|
||||||
.route("/u/:key", post(http_proxy::user_proxy_web3_rpc))
|
.route("/u/:user_key", post(http_proxy::user_proxy_web3_rpc))
|
||||||
.route("/u/:key", get(ws_proxy::user_websocket_handler))
|
.route("/u/:user_key", get(ws_proxy::user_websocket_handler))
|
||||||
.route("/health", get(http::health))
|
.route("/health", get(http::health))
|
||||||
.route("/status", get(http::status))
|
.route("/status", get(http::status))
|
||||||
.route("/users", post(users::create_user))
|
.route("/users", post(users::create_user))
|
||||||
@ -111,7 +135,7 @@ pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()>
|
|||||||
// `axum::Server` is a re-export of `hyper::Server`
|
// `axum::Server` is a re-export of `hyper::Server`
|
||||||
// TODO: allow only listening on localhost?
|
// TODO: allow only listening on localhost?
|
||||||
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
let addr = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
debug!("listening on port {}", port);
|
info!("listening on port {}", port);
|
||||||
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
|
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
|
||||||
axum::Server::bind(&addr)
|
axum::Server::bind(&addr)
|
||||||
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
|
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
||||||
|
extract::Path,
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
Extension,
|
Extension,
|
||||||
};
|
};
|
||||||
@ -14,6 +15,7 @@ use serde_json::{json, value::RawValue};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
||||||
use tracing::{error, info, trace};
|
use tracing::{error, info, trace};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app::Web3ProxyApp,
|
app::Web3ProxyApp,
|
||||||
@ -37,9 +39,9 @@ pub async fn public_websocket_handler(
|
|||||||
pub async fn user_websocket_handler(
|
pub async fn user_websocket_handler(
|
||||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||||
ws: WebSocketUpgrade,
|
ws: WebSocketUpgrade,
|
||||||
key: String,
|
Path(user_key): Path<Uuid>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
if let Err(x) = rate_limit_by_key(&app, &key).await {
|
if let Err(x) = rate_limit_by_key(&app, user_key).await {
|
||||||
return x.into_response();
|
return x.into_response();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,6 +167,7 @@ impl JsonRpcForwardedResponse {
|
|||||||
pub fn from_anyhow_error(err: anyhow::Error, id: Box<RawValue>) -> Self {
|
pub fn from_anyhow_error(err: anyhow::Error, id: Box<RawValue>) -> Self {
|
||||||
let err = format!("{:?}", err);
|
let err = format!("{:?}", err);
|
||||||
|
|
||||||
|
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
|
||||||
warn!("forwarding error. {:?}", err);
|
warn!("forwarding error. {:?}", err);
|
||||||
|
|
||||||
JsonRpcForwardedResponse {
|
JsonRpcForwardedResponse {
|
||||||
|
Loading…
Reference in New Issue
Block a user