improvements for web3-this-then-that

This commit is contained in:
Bryan Stitt 2023-06-09 16:35:14 -07:00
parent 69dd8f0046
commit 41950c886c
5 changed files with 104 additions and 95 deletions

@ -111,11 +111,12 @@ GET /user/balance
If valid, displays data about the user's balance and payments as JSON.
POST /user/balance/:txid
Not yet implemented. Rate limited by IP.
Rate limited by IP.
Checks the ":txid" for a transaction that updates a user's balance.
The backend will be watching for these transactions, so this should not be needed in the common case.
However, log susbcriptions are not perfect and so it might sometimes be needed.
Any authorized user can call this endpoint for any other user's transaction.
GET /user/keys
Checks the "AUTHORIZATION" header for a valid bearer token.
@ -141,10 +142,6 @@ GET /subuser/rpc_keys
GET /user/deposits
Retrieves the user's deposit history.
GET /user/balance/:tx_hash
Accepts a tx_hash and updates the user's balance according to this transaction.
Any authorized user can call this endpoint for any other user's transaction.
GET /user/referral
Fetches a user's referral link.

@ -172,7 +172,7 @@ pub async fn serve(
.route("/user/deposits", get(users::payment::user_deposits_get))
.route(
"/user/balance/:tx_hash",
get(users::payment::user_balance_post),
post(users::payment::user_balance_post),
)
.route("/user/keys", get(users::rpc_keys::rpc_keys_get))
.route("/user/keys", post(users::rpc_keys::rpc_keys_management))

@ -347,84 +347,97 @@ async fn handle_socket_payload(
let response_id = json_request.id.clone();
// TODO: move this to a seperate function so we can use the try operator
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> =
match &json_request.method[..] {
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
.eth_subscribe(
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
.await
{
Ok((handle, response)) => {
{
let mut x = subscriptions.write().await;
let result: &serde_json::value::RawValue = response
.result
.as_ref()
.context("there should be a result here")?;
// TODO: there must be a better way to turn a RawValue
let k: U64 = serde_json::from_str(result.get())
.context("subscription ids must be U64s")?;
x.insert(k, handle);
};
Ok(response.into())
}
Err(err) => Err(err),
}
}
"eth_unsubscribe" => {
let request_metadata =
RequestMetadata::new(&app, authorization.clone(), &json_request, None)
.await;
#[derive(serde::Deserialize)]
struct EthUnsubscribeParams([U64; 1]);
match serde_json::from_value(json_request.params) {
Ok::<EthUnsubscribeParams, _>(params) => {
let subscription_id = &params.0[0];
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;
match x.remove(subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
}
};
// TODO: don't create the response here. use a JsonRpcResponseData instead
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
response_id.clone(),
);
request_metadata.add_response(&response);
Ok(response.into())
}
Err(err) => Err(Web3ProxyError::BadRequest(
f!("incorrect params given for eth_unsubscribe. {err:?}").into(),
)),
}
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> = match &json_request.method
[..]
{
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
.eth_subscribe(
authorization.clone(),
json_request,
subscription_count,
response_sender.clone(),
)
.await
.map(|(_, response, _)| response),
};
{
Ok((handle, response)) => {
{
let mut x = subscriptions.write().await;
let result: &serde_json::value::RawValue = response
.result
.as_ref()
.context("there should be a result here")?;
// TODO: there must be a better way to turn a RawValue
let k: U64 = serde_json::from_str(result.get())
.context("subscription ids must be U64s")?;
x.insert(k, handle);
};
Ok(response.into())
}
Err(err) => Err(err),
}
}
"eth_unsubscribe" => {
let request_metadata =
RequestMetadata::new(&app, authorization.clone(), &json_request, None)
.await;
let subscription_id: U64 = if json_request.params.is_array() {
if let Some(params) = json_request.params.get(0) {
serde_json::from_value(params.clone()).map_err(|err| {
Web3ProxyError::BadRequest(
format!("invalid params for eth_unsubscribe: {}", err).into(),
)
})?
} else {
return Err(Web3ProxyError::BadRequest(
f!("no params for eth_unsubscribe").into(),
));
}
} else if json_request.params.is_string() {
serde_json::from_value(json_request.params).map_err(|err| {
Web3ProxyError::BadRequest(
format!("invalid params for eth_unsubscribe: {}", err).into(),
)
})?
} else {
return Err(Web3ProxyError::BadRequest(
"unexpected params given for eth_unsubscribe".into(),
));
};
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;
match x.remove(&subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
}
};
// TODO: don't create the response here. use a JsonRpcResponseData instead
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
response_id.clone(),
);
request_metadata.add_response(&response);
Ok(response.into())
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
.map(|(_, response, _)| response),
};
(response_id, response)
}

@ -1,5 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::frontend::authorization::login_is_authorized;
use anyhow::Context;
use axum::{
extract::Path,
@ -7,6 +8,7 @@ use axum::{
response::IntoResponse,
Extension, Json, TypedHeader,
};
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user};
use ethbloom::Input as BloomInput;
@ -105,13 +107,14 @@ pub async fn user_deposits_get(
#[debug_handler]
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
InsecureClientIp(ip): InsecureClientIp,
Path(mut params): Path<HashMap<String, String>>,
) -> Web3ProxyResponse {
// I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap"
// Check that the user is logged-in and authorized
// The semaphore keeps a user from submitting tons of transactions in parallel which would DOS our backends
let (_, _semaphore) = app.bearer_is_authorized(bearer).await?;
// we rate limit by ip instead of bearer token so transactions are easy to submit from scripts
// TODO: if ip is a 10. or a 172., allow unlimited
// TODO: why is login_is_authorized giving me a 403?!
// login_is_authorized(&app, ip).await?;
// Get the transaction hash, and the amount that the user wants to top up by.
// Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg)
@ -173,6 +176,7 @@ pub async fn user_balance_post(
// check bloom filter to be sure this transaction contains any relevant logs
// TODO: This does not work properly right now, get back this eventually
// TODO: compare to code in llamanodes/web3-this-then-that
// if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract
// .payment_received_filter()
// .filter
@ -239,11 +243,8 @@ pub async fn user_balance_post(
payment_token_amount.set_scale(payment_token_decimals)?;
info!(
"Found deposit transaction for: {:?} {:?} {:?} {:?}",
&recipient_account.to_fixed_bytes(),
recipient_account,
payment_token_address,
payment_token_amount
"Found deposit transaction for: {:?} {:?} {:?}",
recipient_account, payment_token_address, payment_token_amount
);
let recipient = match user::Entity::find()

@ -498,7 +498,7 @@ impl Web3Rpcs {
// this is unlikely but possible
// TODO: better log that includes all the votes
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}. {:#?} -> {:#?}",
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
@ -509,8 +509,6 @@ impl Web3Rpcs {
old_head_block,
rpc,
rpc_head_str,
old_consensus_connections,
new_consensus_rpcs,
);
if backups_needed {