From 41950c886c88e2671235910ea2759a229b6879ea Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 16:35:14 -0700 Subject: [PATCH] improvements for web3-this-then-that --- docs/http routes.txt | 7 +- web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 167 ++++++++++++----------- web3_proxy/src/frontend/users/payment.rs | 19 +-- web3_proxy/src/rpcs/blockchain.rs | 4 +- 5 files changed, 104 insertions(+), 95 deletions(-) diff --git a/docs/http routes.txt b/docs/http routes.txt index e6ab9a25..05d96490 100644 --- a/docs/http routes.txt +++ b/docs/http routes.txt @@ -111,11 +111,12 @@ GET /user/balance If valid, displays data about the user's balance and payments as JSON. POST /user/balance/:txid - Not yet implemented. Rate limited by IP. + Rate limited by IP. Checks the ":txid" for a transaction that updates a user's balance. The backend will be watching for these transactions, so this should not be needed in the common case. However, log susbcriptions are not perfect and so it might sometimes be needed. + Any authorized user can call this endpoint for any other user's transaction. GET /user/keys Checks the "AUTHORIZATION" header for a valid bearer token. @@ -141,10 +142,6 @@ GET /subuser/rpc_keys GET /user/deposits Retrieves the user's deposit history. -GET /user/balance/:tx_hash - Accepts a tx_hash and updates the user's balance according to this transaction. - Any authorized user can call this endpoint for any other user's transaction. - GET /user/referral Fetches a user's referral link. diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 9528a189..d5c3e9b5 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -172,7 +172,7 @@ pub async fn serve( .route("/user/deposits", get(users::payment::user_deposits_get)) .route( "/user/balance/:tx_hash", - get(users::payment::user_balance_post), + post(users::payment::user_balance_post), ) .route("/user/keys", get(users::rpc_keys::rpc_keys_get)) .route("/user/keys", post(users::rpc_keys::rpc_keys_management)) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index e597a9ab..dee85ee5 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -347,84 +347,97 @@ async fn handle_socket_payload( let response_id = json_request.id.clone(); // TODO: move this to a seperate function so we can use the try operator - let response: Web3ProxyResult = - match &json_request.method[..] { - "eth_subscribe" => { - // TODO: how can we subscribe with proxy_mode? - match app - .eth_subscribe( - authorization.clone(), - json_request, - subscription_count, - response_sender.clone(), - ) - .await - { - Ok((handle, response)) => { - { - let mut x = subscriptions.write().await; - - let result: &serde_json::value::RawValue = response - .result - .as_ref() - .context("there should be a result here")?; - - // TODO: there must be a better way to turn a RawValue - let k: U64 = serde_json::from_str(result.get()) - .context("subscription ids must be U64s")?; - - x.insert(k, handle); - }; - - Ok(response.into()) - } - Err(err) => Err(err), - } - } - "eth_unsubscribe" => { - let request_metadata = - RequestMetadata::new(&app, authorization.clone(), &json_request, None) - .await; - - #[derive(serde::Deserialize)] - struct EthUnsubscribeParams([U64; 1]); - - match serde_json::from_value(json_request.params) { - Ok::(params) => { - let subscription_id = ¶ms.0[0]; - - // TODO: is this the right response? - let partial_response = { - let mut x = subscriptions.write().await; - match x.remove(subscription_id) { - None => false, - Some(handle) => { - handle.abort(); - true - } - } - }; - - // TODO: don't create the response here. use a JsonRpcResponseData instead - let response = JsonRpcForwardedResponse::from_value( - json!(partial_response), - response_id.clone(), - ); - - request_metadata.add_response(&response); - - Ok(response.into()) - } - Err(err) => Err(Web3ProxyError::BadRequest( - f!("incorrect params given for eth_unsubscribe. {err:?}").into(), - )), - } - } - _ => app - .proxy_web3_rpc(authorization.clone(), json_request.into()) + let response: Web3ProxyResult = match &json_request.method + [..] + { + "eth_subscribe" => { + // TODO: how can we subscribe with proxy_mode? + match app + .eth_subscribe( + authorization.clone(), + json_request, + subscription_count, + response_sender.clone(), + ) .await - .map(|(_, response, _)| response), - }; + { + Ok((handle, response)) => { + { + let mut x = subscriptions.write().await; + + let result: &serde_json::value::RawValue = response + .result + .as_ref() + .context("there should be a result here")?; + + // TODO: there must be a better way to turn a RawValue + let k: U64 = serde_json::from_str(result.get()) + .context("subscription ids must be U64s")?; + + x.insert(k, handle); + }; + + Ok(response.into()) + } + Err(err) => Err(err), + } + } + "eth_unsubscribe" => { + let request_metadata = + RequestMetadata::new(&app, authorization.clone(), &json_request, None) + .await; + + let subscription_id: U64 = if json_request.params.is_array() { + if let Some(params) = json_request.params.get(0) { + serde_json::from_value(params.clone()).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + f!("no params for eth_unsubscribe").into(), + )); + } + } else if json_request.params.is_string() { + serde_json::from_value(json_request.params).map_err(|err| { + Web3ProxyError::BadRequest( + format!("invalid params for eth_unsubscribe: {}", err).into(), + ) + })? + } else { + return Err(Web3ProxyError::BadRequest( + "unexpected params given for eth_unsubscribe".into(), + )); + }; + + // TODO: is this the right response? + let partial_response = { + let mut x = subscriptions.write().await; + match x.remove(&subscription_id) { + None => false, + Some(handle) => { + handle.abort(); + true + } + } + }; + + // TODO: don't create the response here. use a JsonRpcResponseData instead + let response = JsonRpcForwardedResponse::from_value( + json!(partial_response), + response_id.clone(), + ); + + request_metadata.add_response(&response); + + Ok(response.into()) + } + _ => app + .proxy_web3_rpc(authorization.clone(), json_request.into()) + .await + .map(|(_, response, _)| response), + }; (response_id, response) } diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 3e7e6d54..52f12879 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,5 +1,6 @@ use crate::app::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; +use crate::frontend::authorization::login_is_authorized; use anyhow::Context; use axum::{ extract::Path, @@ -7,6 +8,7 @@ use axum::{ response::IntoResponse, Extension, Json, TypedHeader, }; +use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user}; use ethbloom::Input as BloomInput; @@ -105,13 +107,14 @@ pub async fn user_deposits_get( #[debug_handler] pub async fn user_balance_post( Extension(app): Extension>, - TypedHeader(Authorization(bearer)): TypedHeader>, + InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, ) -> Web3ProxyResponse { // I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap" - // Check that the user is logged-in and authorized - // The semaphore keeps a user from submitting tons of transactions in parallel which would DOS our backends - let (_, _semaphore) = app.bearer_is_authorized(bearer).await?; + // we rate limit by ip instead of bearer token so transactions are easy to submit from scripts + // TODO: if ip is a 10. or a 172., allow unlimited + // TODO: why is login_is_authorized giving me a 403?! + // login_is_authorized(&app, ip).await?; // Get the transaction hash, and the amount that the user wants to top up by. // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) @@ -173,6 +176,7 @@ pub async fn user_balance_post( // check bloom filter to be sure this transaction contains any relevant logs // TODO: This does not work properly right now, get back this eventually + // TODO: compare to code in llamanodes/web3-this-then-that // if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract // .payment_received_filter() // .filter @@ -239,11 +243,8 @@ pub async fn user_balance_post( payment_token_amount.set_scale(payment_token_decimals)?; info!( - "Found deposit transaction for: {:?} {:?} {:?} {:?}", - &recipient_account.to_fixed_bytes(), - recipient_account, - payment_token_address, - payment_token_amount + "Found deposit transaction for: {:?} {:?} {:?}", + recipient_account, payment_token_address, payment_token_amount ); let recipient = match user::Entity::find() diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f8fda474..d8fd4255 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -498,7 +498,7 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log that includes all the votes warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}. {:#?} -> {:#?}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, @@ -509,8 +509,6 @@ impl Web3Rpcs { old_head_block, rpc, rpc_head_str, - old_consensus_connections, - new_consensus_rpcs, ); if backups_needed {