user post endpoint

This commit is contained in:
Bryan Stitt 2022-10-26 21:39:26 +00:00
parent 928fc7e9b0
commit 8f3e5c0146
9 changed files with 335 additions and 116 deletions

5
Cargo.lock generated

@ -2407,9 +2407,9 @@ dependencies = [
[[package]]
name = "itertools"
version = "0.10.3"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
@ -5489,6 +5489,7 @@ dependencies = [
"hdrhistogram",
"http",
"ipnet",
"itertools",
"metered",
"migration",
"moka",

14
TODO.md

@ -199,20 +199,20 @@ These are roughly in order of completition
- [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] sign in
- [x] sign out
- [-] GET profile endpoint
- [-] POST profile endpoint
- [x] GET profile endpoint
- [x] POST profile endpoint
- [x] GET stats endpoint
- [x] display requests per second per api key (only with authentication!)
- [x] display concurrent requests per api key (only with authentication!)
- [x] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!)
- [x] get aggregate stats endpoint
- [x] display requests per second per api key (only with authentication!)
- [ ] POST key endpoint
- [ ] generate a new key from a web endpoint
- [ ] modifying key settings such as private relay, revert logging, ip/origin/etc checks
- [ ] GET logged reverts on an endpoint that **requires authentication**.
- [x] GET logged reverts on an endpoint that **requires authentication**.
- [ ] add config for concurrent requests from public requests
- [ ] per-user stats should probably be locked behind authentication. the code is written but disabled for easy development
- if we do this, we should also have an admin-only endpoint for seeing these for support requests
- [ ] display concurrent requests per api key (only with authentication!)
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] include if archive query or not in the stats
- this is already partially done, but we need to double check it works. preferrably with tests
@ -249,10 +249,12 @@ These are roughly in order of completition
- [ ] from what i thought, /status should show hashes > numbers!
- but block numbers count is maxed out (10k)
- and block hashes count is tiny (83)
- what is going on?
- what is going on? when the server fist launches they are in sync
- [ ] after adding semaphores (or maybe something else), CPU load seems a lot higher. investigate
- [ ] Ulid instead of Uuid for database ids
- might have to use Uuid in sea-orm and then convert to Ulid on display
- [ ] add pruning or aggregating or something to log revert trace. otherwise our databases are going to grow really big
- [ ] after adding this, allow posting to /user/keys to turn on revert logging
## V1

@ -76,3 +76,4 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lo
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.2.1"
itertools = "0.10.5"

@ -14,7 +14,7 @@ use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent};
use axum::headers::{Origin, Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
use ethers::core::utils::keccak256;
@ -74,7 +74,7 @@ pub struct UserKeyData {
// if None, allow unlimited concurrent requests
pub max_concurrent_requests: Option<u64>,
/// if None, allow any Origin
pub allowed_origins: Option<Vec<String>>,
pub allowed_origins: Option<Vec<Origin>>,
/// if None, allow any Referer
pub allowed_referers: Option<Vec<Referer>>,
/// if None, allow any UserAgent

@ -5,16 +5,16 @@ use crate::app::{UserKeyData, Web3ProxyApp};
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use axum::headers::authorization::Bearer;
use axum::headers::{Origin, Referer, UserAgent};
use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::TypedHeader;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{user, user_keys};
use http::HeaderValue;
use ipnet::IpNet;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc};
@ -47,10 +47,10 @@ pub enum RateLimitResult {
UnknownKey,
}
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug)]
pub struct AuthorizedKey {
pub ip: IpAddr,
pub origin: Option<String>,
pub origin: Option<Origin>,
pub user_id: u64,
pub user_key_id: u64,
// TODO: just use an f32? even an f16 is probably fine
@ -191,16 +191,12 @@ impl AuthorizedKey {
}
// check origin
// TODO: do this with the Origin type instead of a String?
let origin = origin.map(|x| x.to_string());
match (&origin, &user_key_data.allowed_origins) {
(None, None) => {}
(Some(_), None) => {}
(None, Some(_)) => return Err(anyhow::anyhow!("Origin required")),
(Some(origin), Some(allowed_origins)) => {
let origin = origin.to_string();
if !allowed_origins.contains(&origin) {
if !allowed_origins.contains(origin) {
return Err(anyhow::anyhow!("IP is not allowed!"));
}
}
@ -444,7 +440,7 @@ impl Web3ProxyApp {
}
} else {
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
todo!("no rate limiter");
Ok(RateLimitResult::AllowedIp(ip, None))
}
}
@ -508,6 +504,7 @@ impl Web3ProxyApp {
let user_uuid: Uuid = user_key.into();
// TODO: join the user table to this to return the User? we don't always need it
// TODO: also attach secondary users
match user_keys::Entity::find()
.filter(user_keys::Column::ApiKey.eq(user_uuid))
.filter(user_keys::Column::Active.eq(true))
@ -515,51 +512,59 @@ impl Web3ProxyApp {
.await?
{
Some(user_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
let allowed_ips: Option<Vec<IpNet>> =
user_key_model.allowed_ips.map(|allowed_ips| {
serde_json::from_str::<Vec<String>>(&allowed_ips)
.expect("allowed_ips should always parse")
if let Some(allowed_ips) = user_key_model.allowed_ips {
let x = allowed_ips
.split(',')
.map(|x| x.parse::<IpNet>())
.collect::<Result<Vec<_>, _>>()?;
Some(x)
} else {
None
};
let allowed_origins: Option<Vec<Origin>> =
if let Some(allowed_origins) = user_key_model.allowed_origins {
// TODO: do this without collecting twice?
let x = allowed_origins
.split(',')
.map(HeaderValue::from_str)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
// TODO: try_for_each
.map(|x| {
x.parse::<IpNet>().expect("ip address should always parse")
})
.collect()
});
.map(|x| Origin::decode(&mut [x].iter()))
.collect::<Result<Vec<_>, _>>()?;
// TODO: should this be an Option<Vec<Origin>>?
let allowed_origins =
user_key_model.allowed_origins.map(|allowed_origins| {
serde_json::from_str::<Vec<String>>(&allowed_origins)
.expect("allowed_origins should always parse")
});
Some(x)
} else {
None
};
let allowed_referers =
user_key_model.allowed_referers.map(|allowed_referers| {
serde_json::from_str::<Vec<String>>(&allowed_referers)
.expect("allowed_referers should always parse")
.into_iter()
// TODO: try_for_each
.map(|x| {
x.parse::<Referer>().expect("referer should always parse")
})
.collect()
});
let allowed_referers: Option<Vec<Referer>> =
if let Some(allowed_referers) = user_key_model.allowed_referers {
let x = allowed_referers
.split(',')
.map(|x| x.parse::<Referer>())
.collect::<Result<Vec<_>, _>>()?;
let allowed_user_agents =
user_key_model
.allowed_user_agents
.map(|allowed_user_agents| {
serde_json::from_str::<Vec<String>>(&allowed_user_agents)
.expect("allowed_user_agents should always parse")
.into_iter()
// TODO: try_for_each
.map(|x| {
x.parse::<UserAgent>()
.expect("user agent should always parse")
})
.collect()
});
Some(x)
} else {
None
};
let allowed_user_agents: Option<Vec<UserAgent>> =
if let Some(allowed_user_agents) = user_key_model.allowed_user_agents {
let x: Result<Vec<_>, _> = allowed_user_agents
.split(',')
.map(|x| x.parse::<UserAgent>())
.collect();
Some(x?)
} else {
None
};
Ok(UserKeyData {
user_id: user_key_model.user_id,

@ -2,12 +2,16 @@
use crate::{app::UserKeyData, jsonrpc::JsonRpcForwardedResponse};
use axum::{
headers,
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use derive_more::From;
use http::header::InvalidHeaderValue;
use ipnet::AddrParseError;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use sea_orm::DbErr;
use std::{error::Error, net::IpAddr};
use tokio::time::Instant;
@ -21,15 +25,19 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
pub enum FrontendErrorResponse {
Anyhow(anyhow::Error),
Box(Box<dyn Error>),
Redis(RedisError),
Response(Response),
Database(DbErr),
HeadersError(headers::Error),
HeaderToString(ToStrError),
InvalidHeaderValue(InvalidHeaderValue),
IpAddrParse(AddrParseError),
NotFound,
RateLimitedUser(UserKeyData, Option<Instant>),
RateLimitedIp(IpAddr, Option<Instant>),
Redis(RedisError),
Response(Response),
/// simple way to return an error message to the user and an anyhow to our logs
StatusCode(StatusCode, String, anyhow::Error),
UnknownKey,
NotFound,
}
impl IntoResponse for FrontendErrorResponse {
@ -60,32 +68,6 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
Self::Redis(err) => {
warn!(?err, "redis");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"redis error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::Response(r) => {
debug_assert_ne!(r.status(), StatusCode::OK);
return r;
}
Self::StatusCode(status_code, err_msg, err) => {
warn!(?status_code, ?err_msg, ?err);
(
status_code,
JsonRpcForwardedResponse::from_str(
&err_msg,
Some(status_code.as_u16().into()),
None,
),
)
}
Self::Database(err) => {
warn!(?err, "database");
(
@ -97,6 +79,51 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
Self::HeadersError(err) => {
warn!(?err, "HeadersError");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
&format!("{}", err),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::IpAddrParse(err) => {
warn!(?err, "IpAddrParse");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
&format!("{}", err),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::InvalidHeaderValue(err) => {
warn!(?err, "InvalidHeaderValue");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
&format!("{}", err),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404
(
StatusCode::NOT_FOUND,
JsonRpcForwardedResponse::from_str(
"not found!",
Some(StatusCode::NOT_FOUND.as_u16().into()),
None,
),
)
}
Self::RateLimitedIp(ip, _retry_at) => {
// TODO: emit a stat
// TODO: include retry_at in the error
@ -124,6 +151,43 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
Self::Redis(err) => {
warn!(?err, "redis");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"redis error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::Response(r) => {
debug_assert_ne!(r.status(), StatusCode::OK);
return r;
}
Self::StatusCode(status_code, err_msg, err) => {
warn!(?status_code, ?err_msg, ?err);
(
status_code,
JsonRpcForwardedResponse::from_str(
&err_msg,
Some(status_code.as_u16().into()),
None,
),
)
}
Self::HeaderToString(err) => {
warn!(?err, "HeaderToString");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
&format!("{}", err),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::UnknownKey => (
StatusCode::UNAUTHORIZED,
JsonRpcForwardedResponse::from_str(
@ -132,18 +196,6 @@ impl IntoResponse for FrontendErrorResponse {
None,
),
),
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404
(
StatusCode::NOT_FOUND,
JsonRpcForwardedResponse::from_str(
"not found!",
Some(StatusCode::NOT_FOUND.as_u16().into()),
None,
),
)
}
};
(status_code, Json(response)).into_response()

@ -8,6 +8,7 @@ use crate::user_queries::{
};
use crate::user_queries::{get_chain_id_from_params, get_query_start_from_params};
use anyhow::Context;
use axum::headers::{Header, Origin, Referer, UserAgent};
use axum::{
extract::{Path, Query},
headers::{authorization::Bearer, Authorization},
@ -19,7 +20,9 @@ use axum_macros::debug_handler;
use entities::{revert_logs, user, user_keys};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use http::{HeaderValue, StatusCode};
use ipnet::IpNet;
use itertools::Itertools;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
@ -34,6 +37,7 @@ use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tracing::warn;
use ulid::Ulid;
use uuid::Uuid;
/// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow.
///
@ -470,6 +474,15 @@ pub struct UserKeysPost {
existing_key_id: Option<u64>,
existing_key: Option<RpcApiKey>,
description: Option<String>,
private_txs: Option<bool>,
active: Option<bool>,
// TODO: enable log_revert_trace: Option<f32>,
allowed_ips: Option<String>,
allowed_origins: Option<String>,
allowed_referers: Option<String>,
allowed_user_agents: Option<String>,
// do not allow! `requests_per_minute: Option<u64>,`
// do not allow! `max_concurrent_requests: Option<u64>,`
}
/// `POST /user/keys` -- Use a bearer token to create a new key or modify an existing key.
@ -484,19 +497,170 @@ pub async fn user_keys_post(
) -> FrontendResult {
let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?;
if let Some(existing_key_id) = payload.existing_key_id {
let db_conn = app.db_conn().context("getting db for user's keys")?;
let mut uk = if let Some(existing_key_id) = payload.existing_key_id {
// get the key and make sure it belongs to the user
todo!("existing by id");
let uk = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
.filter(user_keys::Column::Id.eq(existing_key_id))
.one(&db_conn)
.await
.context("failed loading user's key")?
.context("key does not exist or is not controlled by this bearer token")?;
uk.try_into().unwrap()
} else if let Some(existing_key) = payload.existing_key {
// get the key and make sure it belongs to the user
todo!("existing by key");
let uk = user_keys::Entity::find()
.filter(user_keys::Column::UserId.eq(user.id))
.filter(user_keys::Column::ApiKey.eq(Uuid::from(existing_key)))
.one(&db_conn)
.await
.context("failed loading user's key")?
.context("key does not exist or is not controlled by this bearer token")?;
uk.try_into().unwrap()
} else {
// make a new key
// TODO: limit to 10 keys?
let rpc_key = RpcApiKey::new();
todo!("new key");
user_keys::ActiveModel {
user_id: sea_orm::Set(user.id),
api_key: sea_orm::Set(rpc_key.into()),
requests_per_minute: sea_orm::Set(app.config.default_user_requests_per_minute),
..Default::default()
}
};
// TODO: do we need null descriptions? default to empty string should be fine, right?
if let Some(description) = payload.description {
if description.is_empty() {
uk.description = sea_orm::Set(None);
} else {
uk.description = sea_orm::Set(Some(description));
}
}
if let Some(private_txs) = payload.private_txs {
uk.private_txs = sea_orm::Set(private_txs);
}
if let Some(active) = payload.active {
uk.active = sea_orm::Set(active);
}
if let Some(allowed_ips) = payload.allowed_ips {
if allowed_ips.is_empty() {
uk.allowed_ips = sea_orm::Set(None);
} else {
// split allowed ips on ',' and try to parse them all. error on invalid input
let allowed_ips = allowed_ips
.split(',')
.map(|x| x.parse::<IpNet>())
.collect::<Result<Vec<_>, _>>()?
// parse worked. convert back to Strings
.into_iter()
.map(|x| x.to_string());
// and join them back together
let allowed_ips: String =
Itertools::intersperse(allowed_ips, ", ".to_string()).collect();
uk.allowed_ips = sea_orm::Set(Some(allowed_ips));
}
}
// TODO: this should actually be bytes
if let Some(allowed_origins) = payload.allowed_origins {
if allowed_origins.is_empty() {
uk.allowed_origins = sea_orm::Set(None);
} else {
// split allowed_origins on ',' and try to parse them all. error on invalid input
let allowed_origins = allowed_origins
.split(',')
.map(HeaderValue::from_str)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|x| Origin::decode(&mut [x].iter()))
.collect::<Result<Vec<_>, _>>()?
// parse worked. convert back to String and join them back together
.into_iter()
.map(|x| x.to_string());
let allowed_origins: String =
Itertools::intersperse(allowed_origins, ", ".to_string()).collect();
uk.allowed_origins = sea_orm::Set(Some(allowed_origins));
}
}
// TODO: this should actually be bytes
if let Some(allowed_referers) = payload.allowed_referers {
if allowed_referers.is_empty() {
uk.allowed_referers = sea_orm::Set(None);
} else {
// split allowed ips on ',' and try to parse them all. error on invalid input
let allowed_referers = allowed_referers
.split(',')
.map(HeaderValue::from_str)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|x| Referer::decode(&mut [x].iter()))
.collect::<Result<Vec<_>, _>>()?;
// parse worked. now we can put it back together.
// but we can't go directly to String.
// so we convert to HeaderValues first
let mut header_map = vec![];
for x in allowed_referers {
x.encode(&mut header_map);
}
// convert HeaderValues to Strings
// since we got these from strings, this should always work (unless we figure out using bytes)
let allowed_referers = header_map
.into_iter()
.map(|x| x.to_str().map(|x| x.to_string()))
.collect::<Result<Vec<_>, _>>()?;
// join strings together with commas
let allowed_referers: String =
Itertools::intersperse(allowed_referers.into_iter(), ", ".to_string()).collect();
uk.allowed_referers = sea_orm::Set(Some(allowed_referers));
}
}
if let Some(allowed_user_agents) = payload.allowed_user_agents {
if allowed_user_agents.is_empty() {
uk.allowed_user_agents = sea_orm::Set(None);
} else {
// split allowed_user_agents on ',' and try to parse them all. error on invalid input
let allowed_user_agents = allowed_user_agents
.split(',')
.filter_map(|x| x.parse::<UserAgent>().ok())
// parse worked. convert back to String
.map(|x| x.to_string());
// join the strings together
let allowed_user_agents: String =
Itertools::intersperse(allowed_user_agents, ", ".to_string()).collect();
uk.allowed_user_agents = sea_orm::Set(Some(allowed_user_agents));
}
}
let uk = if uk.is_changed() {
uk.save(&db_conn).await.context("Failed saving user key")?
} else {
uk
};
let uk: user_keys::Model = uk.try_into()?;
Ok(Json(uk).into_response())
}
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.

@ -16,7 +16,7 @@ use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tokio::time::Duration;
use tracing::{debug, trace, warn, Level};
use tracing::{debug, info, trace, warn, Level};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -48,10 +48,10 @@ impl Web3Connections {
return Ok(());
}
let block_num = block.number.as_ref().context("no block num")?;
let mut blockchain = self.blockchain_graphmap.write().await;
let block_num = block.number.as_ref().context("no block num")?;
// TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain {
// this is the only place that writes to block_numbers
@ -65,7 +65,7 @@ impl Web3Connections {
return Ok(());
}
trace!(%block_hash, %block_num, "saving new block");
info!(%block_hash, %block_num, "saving new block");
self.block_hashes
.insert(*block_hash, block.to_owned())
@ -328,7 +328,6 @@ impl Web3Connections {
}
}
// clone to release the read lock on self.block_hashes
if let Some(mut maybe_head_block) = highest_num_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut highest_rpcs = HashSet::<&String>::new();
@ -474,7 +473,6 @@ impl Web3Connections {
// hash changed
debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain");
self.save_block(&consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;

@ -296,10 +296,6 @@ pub async fn get_aggregate_rpc_stats_from_params(
Ok(response)
}
pub async fn get_user_stats(chain_id: u64) -> u64 {
todo!();
}
/// stats grouped by key_id and error_repsponse and method and key
pub async fn get_detailed_stats(
app: &Web3ProxyApp,