Fix Issue 65 Pt. 2 (#153)

* change subuser balance logic and add message to access denied errors

* clearer import

* comments

* comments
This commit is contained in:
Bryan Stitt 2023-06-28 23:00:34 -07:00 committed by GitHub
parent ebceb0d7d6
commit 280e2075d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 95 additions and 83 deletions

@ -57,11 +57,11 @@ pub async fn query_admin_modify_usertier<'a>(
trace!(%caller_id, "query_admin_modify_usertier"); trace!(%caller_id, "query_admin_modify_usertier");
// Check if the caller is an admin (i.e. if he is in an admin table) // Check if the caller is an admin (i.e. if he is in an admin table)
let _admin: admin::Model = admin::Entity::find() let _admin = admin::Entity::find()
.filter(admin::Column::UserId.eq(caller_id)) .filter(admin::Column::UserId.eq(caller_id))
.one(db_conn) .one(db_conn)
.await? .await?
.ok_or(Web3ProxyError::AccessDenied)?; .ok_or(Web3ProxyError::AccessDenied("not an admin".into()))?;
// If we are here, that means an admin was found, and we can safely proceed // If we are here, that means an admin was found, and we can safely proceed

@ -110,8 +110,6 @@ pub struct Web3ProxyApp {
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>, pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users /// rate limit authenticated users
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>, pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
/// Optional time series database for making pretty graphs that load quickly
pub influxdb_client: Option<influxdb2::Client>,
/// concurrent/parallel request limits for anonymous users /// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>, pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>, pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
@ -138,6 +136,10 @@ pub struct Web3ProxyApp {
/// channel for sending stats in a background task /// channel for sending stats in a background task
pub stat_sender: Option<flume::Sender<AppStat>>, pub stat_sender: Option<flume::Sender<AppStat>>,
/// Optional time series database for making pretty graphs that load quickly
influxdb_client: Option<influxdb2::Client>,
/// Simple way to connect ethers Contracsts to the proxy
/// TODO: make this more efficient
internal_provider: OnceCell<Arc<EthersHttpProvider>>, internal_provider: OnceCell<Arc<EthersHttpProvider>>,
} }
@ -709,6 +711,10 @@ impl Web3ProxyApp {
self.watch_consensus_head_receiver.clone() self.watch_consensus_head_receiver.clone()
} }
pub fn influxdb_client(&self) -> Web3ProxyResult<&influxdb2::Client> {
self.influxdb_client.as_ref().ok_or(Web3ProxyError::NoDatabase)
}
/// an ethers provider that you can use with ether's abigen. /// an ethers provider that you can use with ether's abigen.
/// this works for now, but I don't like it /// this works for now, but I don't like it
/// TODO: I would much prefer we figure out the traits and `impl JsonRpcClient for Web3ProxyApp` /// TODO: I would much prefer we figure out the traits and `impl JsonRpcClient for Web3ProxyApp`
@ -1609,7 +1615,7 @@ impl Web3ProxyApp {
method => { method => {
if method.starts_with("admin_") { if method.starts_with("admin_") {
// TODO: emit a stat? will probably just be noise // TODO: emit a stat? will probably just be noise
return Err(Web3ProxyError::AccessDenied); return Err(Web3ProxyError::AccessDenied("admin methods are not allowed".into()));
} }
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server

@ -44,8 +44,9 @@ impl From<Web3ProxyError> for Web3ProxyResult<()> {
#[derive(Debug, Display, Error, From)] #[derive(Debug, Display, Error, From)]
pub enum Web3ProxyError { pub enum Web3ProxyError {
Abi(ethers::abi::Error), Abi(ethers::abi::Error),
AccessDenied, #[error(ignore)]
AccessDeniedNoSubuser, #[from(ignore)]
AccessDenied(Cow<'static, str>),
#[error(ignore)] #[error(ignore)]
Anyhow(anyhow::Error), Anyhow(anyhow::Error),
Arc(Arc<Self>), Arc(Arc<Self>),
@ -117,6 +118,7 @@ pub enum Web3ProxyError {
}, },
NotFound, NotFound,
#[error(ignore)] #[error(ignore)]
#[from(ignore)]
NotImplemented(Cow<'static, str>), NotImplemented(Cow<'static, str>),
NoVolatileRedisDatabase, NoVolatileRedisDatabase,
OriginRequired, OriginRequired,
@ -186,24 +188,13 @@ impl Web3ProxyError {
}, },
) )
} }
Self::AccessDenied => { Self::AccessDenied(msg) => {
// TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident
trace!("access denied"); trace!(%msg, "access denied");
( (
StatusCode::FORBIDDEN, StatusCode::FORBIDDEN,
JsonRpcErrorData { JsonRpcErrorData {
message: "FORBIDDEN".into(), message: format!("FORBIDDEN: {}", msg).into(),
code: StatusCode::FORBIDDEN.as_u16().into(),
data: None,
},
)
}
Self::AccessDeniedNoSubuser => {
trace!("access denied not a subuser");
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
message: "FORBIDDEN: NOT A SUBUSER".into(),
code: StatusCode::FORBIDDEN.as_u16().into(), code: StatusCode::FORBIDDEN.as_u16().into(),
data: None, data: None,
}, },

@ -67,10 +67,7 @@ pub async fn admin_increase_balance(
.filter(admin::Column::UserId.eq(caller_id)) .filter(admin::Column::UserId.eq(caller_id))
.one(&txn) .one(&txn)
.await? .await?
.ok_or_else(|| { .ok_or_else(|| Web3ProxyError::AccessDenied("not an admin".into()))?;
warn!(%caller_id, "not an admin");
Web3ProxyError::AccessDenied
})?;
let user_entry: user::Model = user::Entity::find() let user_entry: user::Model = user::Entity::find()
.filter(user::Column::Address.eq(payload.user_address.as_bytes())) .filter(user::Column::Address.eq(payload.user_address.as_bytes()))
@ -215,7 +212,7 @@ pub async fn admin_imitate_login_get(
.filter(user::Column::Address.eq(admin_address.as_bytes())) .filter(user::Column::Address.eq(admin_address.as_bytes()))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.ok_or(Web3ProxyError::AccessDenied)?; .ok_or(Web3ProxyError::AccessDenied("not an admin".into()))?;
// Get the user that we want to imitate from the read-only database (their id ...) // Get the user that we want to imitate from the read-only database (their id ...)
// TODO: Only get the id, not the whole user object ... // TODO: Only get the id, not the whole user object ...

@ -122,7 +122,7 @@ pub async fn user_balance_post(
(authorization, None) (authorization, None)
} else { } else {
return Err(Web3ProxyError::AccessDenied); return Err(Web3ProxyError::AccessDenied("no bearer token or ip".into()));
}; };
// Get the transaction hash // Get the transaction hash

@ -190,7 +190,9 @@ pub async fn rpc_keys_management(
{ {
Ok(rpc_key.into_active_model()) Ok(rpc_key.into_active_model())
} else { } else {
Err(Web3ProxyError::AccessDenied) Err(Web3ProxyError::AccessDenied(
"secondary user is not an admin or owner".into(),
))
} }
} }
Some((_, None)) => Err(Web3ProxyError::BadResponse( Some((_, None)) => Err(Web3ProxyError::BadResponse(

@ -44,7 +44,7 @@ pub async fn get_user_id_from_params(
.filter(login::Column::BearerToken.eq(user_bearer_token.uuid())) .filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.ok_or(Web3ProxyError::AccessDenied)?; .ok_or(Web3ProxyError::AccessDenied("unknown bearer token".into()))?;
// if expired, delete ALL expired logins // if expired, delete ALL expired logins
let now = Utc::now(); let now = Utc::now();
@ -59,7 +59,7 @@ pub async fn get_user_id_from_params(
// TODO: emit a stat? if this is high something weird might be happening // TODO: emit a stat? if this is high something weird might be happening
trace!("cleared expired logins: {:?}", delete_result); trace!("cleared expired logins: {:?}", delete_result);
return Err(Web3ProxyError::AccessDenied); return Err(Web3ProxyError::AccessDenied("login expired".into()));
} }
save_to_redis = true; save_to_redis = true;
@ -75,7 +75,9 @@ pub async fn get_user_id_from_params(
let user_id: u64 = user_id.parse().context("Parsing user_id param")?; let user_id: u64 = user_id.parse().context("Parsing user_id param")?;
if bearer_user_id != user_id { if bearer_user_id != user_id {
return Err(Web3ProxyError::AccessDenied); return Err(Web3ProxyError::AccessDenied(
"bearer_user_id and user_id mismatch".into(),
));
} }
if save_to_redis { if save_to_redis {
@ -102,7 +104,9 @@ pub async fn get_user_id_from_params(
// TODO: proper error code from a useful error code // TODO: proper error code from a useful error code
// TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: maybe instead of this sharp edged warn, we have a config value?
// TODO: check config for if we should deny or allow this // TODO: check config for if we should deny or allow this
Err(Web3ProxyError::AccessDenied) Err(Web3ProxyError::AccessDenied(
"bearer token required when requesting a specific id".into(),
))
// // TODO: make this a flag // // TODO: make this a flag
// warn!("allowing without auth during development!"); // warn!("allowing without auth during development!");
// Ok(x.parse()?) // Ok(x.parse()?)

@ -314,7 +314,7 @@ impl Web3Rpc {
let head_block_num = self let head_block_num = self
.internal_request::<_, U256>( .internal_request::<_, U256>(
"eth_blockNumber", "eth_blockNumber",
&None::<()>, &[(); 0],
// error here are expected, so keep the level low // error here are expected, so keep the level low
Some(Level::DEBUG.into()), Some(Level::DEBUG.into()),
Some(2), Some(2),
@ -431,7 +431,7 @@ impl Web3Rpc {
let found_chain_id: U64 = self let found_chain_id: U64 = self
.internal_request( .internal_request(
"eth_chainId", "eth_chainId",
&None::<()>, &[(); 0],
Some(Level::TRACE.into()), Some(Level::TRACE.into()),
Some(2), Some(2),
Some(Duration::from_secs(5)), Some(Duration::from_secs(5)),

@ -15,7 +15,7 @@ use axum::{
Json, TypedHeader, Json, TypedHeader,
}; };
use entities::sea_orm_active_enums::Role; use entities::sea_orm_active_enums::Role;
use entities::{balance, rpc_key, secondary_user}; use entities::{balance, rpc_key, secondary_user, user, user_tier};
use fstrings::{f, format_args_f}; use fstrings::{f, format_args_f};
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb2::api::query::FluxRecord; use influxdb2::api::query::FluxRecord;
@ -32,59 +32,76 @@ pub async fn query_user_stats<'a>(
params: &'a HashMap<String, String>, params: &'a HashMap<String, String>,
stat_response_type: StatType, stat_response_type: StatType,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
let (caller_user_id, _semaphore) = match bearer { let (caller_user, _semaphore) = match bearer {
Some(TypedHeader(Authorization(bearer))) => { Some(TypedHeader(Authorization(bearer))) => {
let (user, semaphore) = app.bearer_is_authorized(bearer).await?; let (user, semaphore) = app.bearer_is_authorized(bearer).await?;
(user.id, Some(semaphore))
(Some(user), Some(semaphore))
} }
None => (0, None), None => (None, None),
}; };
// Return an error if the bearer is **not** set, but the StatType is Detailed // Return an error if the bearer is **not** set, but the StatType is Detailed
if stat_response_type == StatType::Detailed && caller_user_id == 0 { if stat_response_type == StatType::Detailed && caller_user.is_none() {
return Err(Web3ProxyError::BadRequest( return Err(Web3ProxyError::AccessDenied(
"Detailed Stats Response requires you to authorize with a bearer token".into(), "Detailed Stats Response requires you to authorize with a bearer token".into(),
)); ));
} }
let db_replica = app.db_replica()?;
// Read the (optional) user-id from the request, this is the logic for subusers // Read the (optional) user-id from the request, this is the logic for subusers
// If there is no bearer token, this is not allowed
let user_id: u64 = params let user_id: u64 = params
.get("user_id") .get("user_id")
.and_then(|x| x.parse::<u64>().ok()) .and_then(|x| x.parse::<u64>().ok())
.unwrap_or(caller_user_id); .unwrap_or_else(|| caller_user.as_ref().map(|x| x.id).unwrap_or_default());
let db_replica = app.db_replica()?; // Only allow stats if the user has an active premium role
// TODO: move this to a helper. it should be simple to check that a user has an active premium account
// In any case, we don't allow stats if the target user does not have a balance if let Some(caller_user) = &caller_user {
// No subuser, we can check the balance directly // get the balance of the user whose stats we are going to fetch (might be self, but might be another user)
if user_id != 0 { let (total_deposits, total_spent) = match balance::Entity::find()
match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_id)) .filter(balance::Column::UserId.eq(user_id))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
{ {
// TODO: We should add the threshold that determines if a user is premium into app.config or so Some(user_balance) => (
Some(user_balance) => { user_balance.total_deposits,
if user_balance.total_deposits - user_balance.total_spent_outside_free_tier user_balance.total_spent_outside_free_tier,
<= Decimal::from(0) ),
{ None => (0.into(), 0.into()),
trace!("User has 0 balance"); };
return Err(Web3ProxyError::PaymentRequired);
} let balance_remaining = total_deposits - total_spent;
// Otherwise make the user pass
} // TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now
None => { if total_deposits < Decimal::from(10) || balance_remaining <= Decimal::from(0) {
trace!("User does not have a balance record, implying that he has no balance. Users must have a balance to access their stats dashboards"); // get the user tier so we can see if it is a tier that has downgrades
let relevant_balance_user_tier_id = if user_id == caller_user.id {
caller_user.user_tier_id
} else {
let user = user::Entity::find_by_id(user_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_id not found")?;
user.user_tier_id
};
let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_tier not found")?;
if user_tier.downgrade_tier_id.is_some() {
trace!(%user_id, "User does not have enough balance to qualify for premium");
return Err(Web3ProxyError::PaymentRequired); return Err(Web3ProxyError::PaymentRequired);
} }
} }
// (Possible) subuser relation if user_id != caller_user.id {
// Check if the caller is a proper subuser (there is a subuser record) // check that there is at least on rpc-keys owned by the requested user and related to the caller user
// Check if the subuser has more than collaborator status let user_rpc_key_ids: Vec<u64> = rpc_key::Entity::find()
if user_id != caller_user_id {
// Find all rpc-keys related to the caller user
let user_rpc_keys: Vec<u64> = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user_id)) .filter(rpc_key::Column::UserId.eq(user_id))
.all(db_replica.as_ref()) .all(db_replica.as_ref())
.await? .await?
@ -92,31 +109,26 @@ pub async fn query_user_stats<'a>(
.map(|x| x.id) .map(|x| x.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
match secondary_user::Entity::find() if secondary_user::Entity::find()
.filter(secondary_user::Column::UserId.eq(caller_user_id)) .filter(secondary_user::Column::UserId.eq(caller_user.id))
.filter(secondary_user::Column::RpcSecretKeyId.is_in(user_rpc_keys)) .filter(secondary_user::Column::RpcSecretKeyId.is_in(user_rpc_key_ids))
.filter(secondary_user::Column::Role.ne(Role::Collaborator))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.is_none()
{ {
Some(secondary_user_record) => { return Err(Web3ProxyError::AccessDenied(
if secondary_user_record.role == Role::Collaborator { "Not a subuser of the given user_id".into(),
trace!("Subuser is only a collaborator, collaborators cannot see stats"); ));
return Err(Web3ProxyError::AccessDenied);
}
}
None => {
// Then we must do an access denied
return Err(Web3ProxyError::AccessDeniedNoSubuser);
}
} }
} }
} else if user_id != 0 {
return Err(Web3ProxyError::AccessDenied(
"User Stats Response requires you to authorize with a bearer token".into(),
));
} }
// TODO: have a getter for this. do we need a connection pool on it? let influxdb_client = app.influxdb_client()?;
let influxdb_client = app
.influxdb_client
.as_ref()
.context("query_user_stats needs an influxdb client")?;
let query_window_seconds = get_query_window_seconds_from_params(params)?; let query_window_seconds = get_query_window_seconds_from_params(params)?;
let query_start = get_query_start_from_params(params)?.timestamp(); let query_start = get_query_start_from_params(params)?.timestamp();