diff --git a/Cargo.lock b/Cargo.lock index 3100e6ea..ce903ed2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5446,6 +5446,7 @@ dependencies = [ "fdlimit", "flume", "futures", + "glob", "handlebars", "hashbrown 0.13.1", "hdrhistogram", diff --git a/TODO.md b/TODO.md index 2313030f..0a09e0fe 100644 --- a/TODO.md +++ b/TODO.md @@ -242,11 +242,16 @@ These are roughly in order of completition - [x] cli command to change user_tier by key - [x] cache the status page for a second - [x] request accounting for websockets -- [ ] bug! websocket (and last ethspam) have no rpc_key_id attached to their rpc_accounting rows +- [ ] database merge scripts - [ ] add block timestamp to the /status page - [ ] be sure to save the timestamp in a way that our request routing logic can make use of it - [ ] change invite codes to set the user_tier +- [ ] period_datetime should always be :00. right now it depends on start time +- [ ] two servers running will confuse rpc_accounting! + - it won't happen with users often because they should be sticky to one proxy, but unauthenticated users will definitely hit this + - one option: we need the insert to be an upsert, but how do we merge historgrams? + - [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!" ## V1 @@ -535,3 +540,4 @@ in another repo: event subscriber - [ ] use Stretto instead of Moka - [ ] support alchemy_minedTransactions - [ ] debug print of user::Model's address is a big vec of numbers. make that hex somehow +- [ ] should we combine the proxy and cli into one bin? \ No newline at end of file diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 91e86313..276bb369 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -71,3 +71,4 @@ ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" uuid = "1.2.2" itertools = "0.10.5" +glob = "0.3.0" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app/mod.rs similarity index 69% rename from web3_proxy/src/app.rs rename to web3_proxy/src/app/mod.rs index 2ccb2808..f92c5fd9 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app/mod.rs @@ -1,4 +1,5 @@ // TODO: this file is way too big now. move things into other modules +mod ws; use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use crate::block_number::block_needed; @@ -13,19 +14,17 @@ use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use anyhow::Context; -use axum::extract::ws::Message; use axum::headers::{Origin, Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; -use futures::future::Abortable; -use futures::future::{join_all, AbortHandle}; +use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use hashbrown::HashMap; use ipnet::IpNet; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection}; use migration::sea_query::table::ColumnDef; @@ -38,13 +37,12 @@ use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; -use std::sync::atomic::{self, AtomicUsize}; +use std::sync::atomic; use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use ulid::Ulid; // TODO: make this customizable? @@ -112,6 +110,7 @@ pub struct Web3ProxyApp { /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, + // TODO: this key should be our RpcSecretKey class, not Ulid pub frontend_key_rate_limiter: Option>, pub login_rate_limiter: Option, pub vredis_pool: Option, @@ -578,352 +577,6 @@ impl Web3ProxyApp { .expect("prometheus metrics should always serialize") } - #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] - pub async fn eth_subscribe<'a>( - self: &'a Arc, - authorization: Arc, - request_json: JsonRpcRequest, - subscription_count: &'a AtomicUsize, - // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now - response_sender: flume::Sender, - ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { - // TODO: this is not efficient - let request_bytes = serde_json::to_string(&request_json) - .context("finding request size")? - .len(); - - let request_metadata = - Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap()); - - let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); - - // TODO: this only needs to be unique per connection. we don't need it globably unique - let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst); - let subscription_id = U64::from(subscription_id); - - // save the id so we can use it in the response - let id = request_json.id.clone(); - - // TODO: calling json! on every request is probably not fast. but we can only match against - // TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into - match request_json.params.as_ref() { - Some(x) if x == &json!(["newHeads"]) => { - let authorization = authorization.clone(); - let head_block_receiver = self.head_block_receiver.clone(); - let stat_sender = self.stat_sender.clone(); - - trace!("newHeads subscription {:?}", subscription_id); - tokio::spawn(async move { - let mut head_block_receiver = Abortable::new( - WatchStream::new(head_block_receiver), - subscription_registration, - ); - - while let Some(new_head) = head_block_receiver.next().await { - // TODO: what should the payload for RequestMetadata be? - let request_metadata = - Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method":"eth_subscription", - "params": { - "subscription": subscription_id, - // TODO: option to include full transaction objects instead of just the hashes? - "result": new_head.as_ref(), - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // we could use response.num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send_async(response_msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - - if let Some(stat_sender) = stat_sender.as_ref() { - let response_stat = ProxyResponseStat::new( - "eth_subscription(newHeads)".to_string(), - authorization.clone(), - request_metadata.clone(), - response_bytes, - ); - - if let Err(err) = stat_sender.send_async(response_stat.into()).await { - // TODO: what should we do? - warn!( - "stat_sender failed inside newPendingTransactions: {:?}", - err - ); - } - } - } - - trace!("closed newHeads subscription. id={:?}", subscription_id); - }); - } - Some(x) if x == &json!(["newPendingTransactions"]) => { - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let stat_sender = self.stat_sender.clone(); - let authorization = authorization.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending newPendingTransactions subscription id: {:?}", - subscription_id - ); - - // TODO: do something with this handle? - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = - Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - "result": new_tx.hash, - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // we could use response.num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send_async(response_msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - - if let Some(stat_sender) = stat_sender.as_ref() { - let response_stat = ProxyResponseStat::new( - "eth_subscription(newPendingTransactions)".to_string(), - authorization.clone(), - request_metadata.clone(), - response_bytes, - ); - - if let Err(err) = stat_sender.send_async(response_stat.into()).await { - // TODO: what should we do? - warn!( - "stat_sender failed inside newPendingTransactions: {:?}", - err - ); - } - } - } - - trace!( - "closed newPendingTransactions subscription: {:?}", - subscription_id - ); - }); - } - Some(x) if x == &json!(["newPendingFullTransactions"]) => { - // TODO: too much copy/pasta with newPendingTransactions - let authorization = authorization.clone(); - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let stat_sender = self.stat_sender.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending newPendingFullTransactions subscription: {:?}", - subscription_id - ); - - // TODO: do something with this handle? - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = - Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - // upstream just sends the txid, but we want to send the whole transaction - "result": new_tx, - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // we could use response.num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send_async(response_msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - - if let Some(stat_sender) = stat_sender.as_ref() { - let response_stat = ProxyResponseStat::new( - "eth_subscription(newPendingFullTransactions)".to_string(), - authorization.clone(), - request_metadata.clone(), - response_bytes, - ); - - if let Err(err) = stat_sender.send_async(response_stat.into()).await { - // TODO: what should we do? - warn!( - "stat_sender failed inside newPendingFullTransactions: {:?}", - err - ); - } - } - } - - trace!( - "closed newPendingFullTransactions subscription: {:?}", - subscription_id - ); - }); - } - Some(x) if x == &json!(["newPendingRawTransactions"]) => { - // TODO: too much copy/pasta with newPendingTransactions - let authorization = authorization.clone(); - let pending_tx_receiver = self.pending_tx_sender.subscribe(); - let stat_sender = self.stat_sender.clone(); - - let mut pending_tx_receiver = Abortable::new( - BroadcastStream::new(pending_tx_receiver), - subscription_registration, - ); - - trace!( - "pending transactions subscription id: {:?}", - subscription_id - ); - - // TODO: do something with this handle? - tokio::spawn(async move { - while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = - Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); - - let new_tx = match new_tx_state { - TxStatus::Pending(tx) => tx, - TxStatus::Confirmed(..) => continue, - TxStatus::Orphaned(tx) => tx, - }; - - // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id - let response_json = json!({ - "jsonrpc": "2.0", - "method": "eth_subscription", - "params": { - "subscription": subscription_id, - // upstream just sends the txid, but we want to send the raw transaction - "result": new_tx.rlp(), - }, - }); - - let response_str = serde_json::to_string(&response_json) - .expect("this should always be valid json"); - - // we could use response.num_bytes() here, but since we already have the string, this is easier - let response_bytes = response_str.len(); - - // TODO: do clients support binary messages? - let response_msg = Message::Text(response_str); - - if response_sender.send_async(response_msg).await.is_err() { - // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? - break; - }; - - if let Some(stat_sender) = stat_sender.as_ref() { - let response_stat = ProxyResponseStat::new( - "eth_subscription(newPendingRawTransactions)".to_string(), - authorization.clone(), - request_metadata.clone(), - response_bytes, - ); - - if let Err(err) = stat_sender.send_async(response_stat.into()).await { - // TODO: what should we do? - warn!( - "stat_sender failed inside newPendingRawTransactions: {:?}", - err - ); - } - } - } - - trace!( - "closed newPendingRawTransactions subscription: {:?}", - subscription_id - ); - }); - } - _ => return Err(anyhow::anyhow!("unimplemented")), - } - - // TODO: do something with subscription_join_handle? - - let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); - - if let Some(stat_sender) = self.stat_sender.as_ref() { - let response_stat = ProxyResponseStat::new( - request_json.method.clone(), - authorization.clone(), - request_metadata, - response.num_bytes(), - ); - - if let Err(err) = stat_sender.send_async(response_stat.into()).await { - // TODO: what should we do? - warn!("stat_sender failed inside websocket: {:?}", err); - } - } - - // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? - Ok((subscription_abort_handle, response)) - } - /// send the request or batch of requests to the approriate RPCs pub async fn proxy_web3_rpc( self: &Arc, diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs new file mode 100644 index 00000000..e6ac30c0 --- /dev/null +++ b/web3_proxy/src/app/ws.rs @@ -0,0 +1,367 @@ +//! Websocket-specific functions for the Web3ProxyApp + +use super::{Web3ProxyApp, REQUEST_PERIOD}; +use crate::app_stats::ProxyResponseStat; +use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::jsonrpc::JsonRpcRequest; +use crate::rpcs::transactions::TxStatus; +use anyhow::Context; +use axum::extract::ws::Message; +use ethers::prelude::U64; +use futures::future::AbortHandle; +use futures::future::Abortable; +use futures::stream::StreamExt; +use log::{trace, warn}; +use serde_json::json; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; +use tokio_stream::wrappers::{BroadcastStream, WatchStream}; + +impl Web3ProxyApp { + // TODO: #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] + pub async fn eth_subscribe<'a>( + self: &'a Arc, + authorization: Arc, + request_json: JsonRpcRequest, + subscription_count: &'a AtomicUsize, + // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now + response_sender: flume::Sender, + ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { + // TODO: this is not efficient + let request_bytes = serde_json::to_string(&request_json) + .context("finding request size")? + .len(); + + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap()); + + let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); + + // TODO: this only needs to be unique per connection. we don't need it globably unique + let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst); + let subscription_id = U64::from(subscription_id); + + // save the id so we can use it in the response + let id = request_json.id.clone(); + + // TODO: calling json! on every request is probably not fast. but we can only match against + // TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into + match request_json.params.as_ref() { + Some(x) if x == &json!(["newHeads"]) => { + let authorization = authorization.clone(); + let head_block_receiver = self.head_block_receiver.clone(); + let stat_sender = self.stat_sender.clone(); + + trace!("newHeads subscription {:?}", subscription_id); + tokio::spawn(async move { + let mut head_block_receiver = Abortable::new( + WatchStream::new(head_block_receiver), + subscription_registration, + ); + + while let Some(new_head) = head_block_receiver.next().await { + // TODO: what should the payload for RequestMetadata be? + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let response_json = json!({ + "jsonrpc": "2.0", + "method":"eth_subscription", + "params": { + "subscription": subscription_id, + // TODO: option to include full transaction objects instead of just the hashes? + "result": new_head.as_ref(), + }, + }); + + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); + + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newHeads)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingTransactions: {:?}", + err + ); + } + } + } + + trace!("closed newHeads subscription {:?}", subscription_id); + }); + } + Some(x) if x == &json!(["newPendingTransactions"]) => { + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); + let authorization = authorization.clone(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + trace!( + "pending newPendingTransactions subscription id: {:?}", + subscription_id + ); + + // TODO: do something with this handle? + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + + let new_tx = match new_tx_state { + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let response_json = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + "result": new_tx.hash, + }, + }); + + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); + + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingTransactions: {:?}", + err + ); + } + } + } + + trace!( + "closed newPendingTransactions subscription: {:?}", + subscription_id + ); + }); + } + Some(x) if x == &json!(["newPendingFullTransactions"]) => { + // TODO: too much copy/pasta with newPendingTransactions + let authorization = authorization.clone(); + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + trace!( + "pending newPendingFullTransactions subscription: {:?}", + subscription_id + ); + + // TODO: do something with this handle? + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + + let new_tx = match new_tx_state { + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let response_json = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + // upstream just sends the txid, but we want to send the whole transaction + "result": new_tx, + }, + }); + + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); + + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingFullTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingFullTransactions: {:?}", + err + ); + } + } + } + + trace!( + "closed newPendingFullTransactions subscription: {:?}", + subscription_id + ); + }); + } + Some(x) if x == &json!(["newPendingRawTransactions"]) => { + // TODO: too much copy/pasta with newPendingTransactions + let authorization = authorization.clone(); + let pending_tx_receiver = self.pending_tx_sender.subscribe(); + let stat_sender = self.stat_sender.clone(); + + let mut pending_tx_receiver = Abortable::new( + BroadcastStream::new(pending_tx_receiver), + subscription_registration, + ); + + trace!( + "pending transactions subscription id: {:?}", + subscription_id + ); + + // TODO: do something with this handle? + tokio::spawn(async move { + while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { + let request_metadata = + Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); + + let new_tx = match new_tx_state { + TxStatus::Pending(tx) => tx, + TxStatus::Confirmed(..) => continue, + TxStatus::Orphaned(tx) => tx, + }; + + // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id + let response_json = json!({ + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "subscription": subscription_id, + // upstream just sends the txid, but we want to send the raw transaction + "result": new_tx.rlp(), + }, + }); + + let response_str = serde_json::to_string(&response_json) + .expect("this should always be valid json"); + + // we could use response.num_bytes() here, but since we already have the string, this is easier + let response_bytes = response_str.len(); + + // TODO: do clients support binary messages? + let response_msg = Message::Text(response_str); + + if response_sender.send_async(response_msg).await.is_err() { + // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? + break; + }; + + if let Some(stat_sender) = stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + "eth_subscription(newPendingRawTransactions)".to_string(), + authorization.clone(), + request_metadata.clone(), + response_bytes, + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!( + "stat_sender failed inside newPendingRawTransactions: {:?}", + err + ); + } + } + } + + trace!( + "closed newPendingRawTransactions subscription: {:?}", + subscription_id + ); + }); + } + _ => return Err(anyhow::anyhow!("unimplemented")), + } + + // TODO: do something with subscription_join_handle? + + let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); + + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + request_json.method.clone(), + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + + if let Err(err) = stat_sender.send_async(response_stat.into()).await { + // TODO: what should we do? + warn!("stat_sender failed inside websocket: {:?}", err); + } + } + + // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? + Ok((subscription_abort_handle, response)) + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs index fa31ba5b..f0972274 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs @@ -4,7 +4,7 @@ use std::fs; use web3_proxy::config::TopConfig; #[derive(FromArgs, PartialEq, Eq, Debug)] -/// Second subcommand. +/// Check the config for any problems. #[argh(subcommand, name = "check_config")] pub struct CheckConfigSubCommand { #[argh(positional)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs b/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs index e69de29b..0228f6b1 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs @@ -0,0 +1 @@ +//! TODO: write this diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index f656b4f4..380eadc8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -2,6 +2,9 @@ mod change_user_tier_by_key; mod check_config; mod create_user; mod drop_migration_lock; +mod list_user_tier; +mod user_export; +mod user_import; use argh::FromArgs; use std::fs; @@ -36,9 +39,10 @@ enum SubCommand { CheckConfig(check_config::CheckConfigSubCommand), CreateUser(create_user::CreateUserSubCommand), DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), - // TODO: sub command to downgrade migrations? - // TODO: sub command to add new api keys to an existing user? - // TODO: sub command to change a user's tier + UserExport(user_export::UserExportSubCommand), + UserImport(user_import::UserImportSubCommand), // TODO: sub command to downgrade migrations? + // TODO: sub command to add new api keys to an existing user? + // TODO: sub command to change a user's tier } #[tokio::main] @@ -83,8 +87,19 @@ async fn main() -> anyhow::Result<()> { x.main(&db_conn).await } SubCommand::DropMigrationLock(x) => { + // very intentionally, do NOT run migrations here let db_conn = get_db(cli_config.db_url, 1, 1).await?; + x.main(&db_conn).await + } + SubCommand::UserExport(x) => { + let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::UserImport(x) => { + let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; + x.main(&db_conn).await } } diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_export.rs b/web3_proxy/src/bin/web3_proxy_cli/user_export.rs new file mode 100644 index 00000000..0b491f28 --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/user_export.rs @@ -0,0 +1,76 @@ +use argh::FromArgs; +use entities::{rpc_key, user}; +use log::info; +use migration::sea_orm::{DatabaseConnection, EntityTrait, PaginatorTrait}; +use std::fs::{self, create_dir_all}; +use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(FromArgs, PartialEq, Eq, Debug)] +/// Export users from the database. +#[argh(subcommand, name = "user_export")] +pub struct UserExportSubCommand { + /// where to write the file + /// TODO: validate this is a valid path here? + #[argh(positional, default = "\"./data/users\".to_string()")] + output_dir: String, +} + +impl UserExportSubCommand { + pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + // create the output dir if it does not exist + create_dir_all(&self.output_dir)?; + + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + + let export_dir = Path::new(&self.output_dir); + + // get all the users from the database (paged) + let mut user_pages = user::Entity::find().into_json().paginate(db_conn, 1000); + + // TODO: for now all user_tier tables match in all databases, but in the future we might need to export/import this + + // save all users to a file + let mut user_file_count = 0; + while let Some(users) = user_pages.fetch_and_next().await? { + let export_file = export_dir.join(format!("{}-users-{}.json", now, user_file_count)); + + fs::write( + export_file, + serde_json::to_string_pretty(&users).expect("users should serialize"), + )?; + + user_file_count += 1; + } + + info!( + "Saved {} user file(s) to {}", + user_file_count, + export_dir.to_string_lossy() + ); + + // get all the rpc keys from the database (paged) + let mut rpc_key_pages = rpc_key::Entity::find().into_json().paginate(db_conn, 1000); + + let mut rpc_key_file_count = 0; + while let Some(rpc_keys) = rpc_key_pages.fetch_and_next().await? { + let export_file = + export_dir.join(format!("{}-rpc_keys-{}.json", now, rpc_key_file_count)); + + fs::write( + export_file, + serde_json::to_string_pretty(&rpc_keys).expect("rpc_keys should serialize"), + )?; + + rpc_key_file_count += 1; + } + + info!( + "Saved {} rpc key file(s) to {}", + rpc_key_file_count, + export_dir.to_string_lossy() + ); + + Ok(()) + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs new file mode 100644 index 00000000..4302f7b8 --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs @@ -0,0 +1,136 @@ +use argh::FromArgs; +use glob::glob; +use hashbrown::HashMap; +use log::{info, warn}; +use migration::sea_orm::DatabaseConnection; +use std::path::{Path, PathBuf}; + +#[derive(FromArgs, PartialEq, Eq, Debug)] +/// Import users from another database. +#[argh(subcommand, name = "user_import")] +pub struct UserImportSubCommand { + #[argh(positional)] + export_timestamp: u64, + + #[argh(positional, default = "\"./data/users\".to_string()")] + /// where to write the file + /// TODO: validate this is a file here? + input_dir: String, +} + +/// Map ids in the export to ids in our database. +type UserMap = HashMap; + +impl UserImportSubCommand { + pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + let import_dir = Path::new(&self.input_dir); + + if !import_dir.exists() { + return Err(anyhow::anyhow!( + "import dir ({}) does not exist!", + import_dir.to_string_lossy() + )); + } + + let user_glob_path = import_dir.join(format!("{}-users-*.json", self.export_timestamp)); + + let user_glob_path = user_glob_path.to_string_lossy(); + + info!("Scanning {}", user_glob_path); + + let mut user_map = HashMap::new(); + let mut user_file_count = 0; + let mut imported_user_count = 0; + for entry in glob(&user_glob_path)? { + match entry { + Ok(path) => { + imported_user_count += + self.import_user_file(db_conn, path, &mut user_map).await? + } + Err(e) => { + warn!( + "imported {} users from {} files.", + imported_user_count, user_file_count + ); + return Err(e.into()); + } + } + user_file_count += 1; + } + + info!( + "Imported {} user(s) from {} file(s). {} users mapped.", + imported_user_count, + user_file_count, + user_map.len() + ); + + let rpc_key_glob_path = + import_dir.join(format!("{}-rpc_keys-*.json", self.export_timestamp)); + + let rpc_key_glob_path = rpc_key_glob_path.to_string_lossy(); + + info!("Scanning {}", rpc_key_glob_path); + + let mut rpc_key_file_count = 0; + let mut imported_rpc_key_count = 0; + for entry in glob(&rpc_key_glob_path)? { + match entry { + Ok(path) => { + imported_rpc_key_count += + self.import_rpc_key_file(db_conn, path, &user_map).await? + } + Err(e) => { + warn!( + "imported {} users from {} files.", + imported_rpc_key_count, rpc_key_file_count + ); + return Err(e.into()); + } + } + rpc_key_file_count += 1; + } + + info!( + "Imported {} rpc key(s) from {} file(s)", + imported_rpc_key_count, rpc_key_file_count + ); + + Ok(()) + } + + pub async fn import_user_file( + &self, + db_conn: &DatabaseConnection, + path: PathBuf, + user_map: &mut UserMap, + ) -> anyhow::Result { + let count = 0; + // TODO: do this all inside a database transaction? + // for each file in the path, read as json + // -- for each entry in the json + // ---- let user_id = if user is in the database + // ------ add user to the database + // ---- else + // ------ add user to the database + // ---- add user to the map. + todo!() + } + + pub async fn import_rpc_key_file( + &self, + db_conn: &DatabaseConnection, + path: PathBuf, + user_map: &UserMap, + ) -> anyhow::Result { + let count = 0; + // TODO: do this all inside a database transaction? + // for each file in the path, read as json + // -- for each entry in the json + // ---- let rpc_key_id = if rpc_key is in the database + // ------ continue + // ---- else + // ------ add rpc_key to the database + todo!() + } +}