start adding user_export and user_import scripts

This commit is contained in:
Bryan Stitt 2022-11-21 22:52:47 +00:00
parent fa4246d7d4
commit ff3c5de0aa
10 changed files with 613 additions and 357 deletions

1
Cargo.lock generated

@ -5446,6 +5446,7 @@ dependencies = [
"fdlimit",
"flume",
"futures",
"glob",
"handlebars",
"hashbrown 0.13.1",
"hdrhistogram",

@ -242,11 +242,16 @@ These are roughly in order of completition
- [x] cli command to change user_tier by key
- [x] cache the status page for a second
- [x] request accounting for websockets
- [ ] bug! websocket (and last ethspam) have no rpc_key_id attached to their rpc_accounting rows
- [ ] database merge scripts
- [ ] add block timestamp to the /status page
- [ ] be sure to save the timestamp in a way that our request routing logic can make use of it
- [ ] change invite codes to set the user_tier
- [ ] period_datetime should always be :00. right now it depends on start time
- [ ] two servers running will confuse rpc_accounting!
- it won't happen with users often because they should be sticky to one proxy, but unauthenticated users will definitely hit this
- one option: we need the insert to be an upsert, but how do we merge historgrams?
- [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!"
## V1
@ -535,3 +540,4 @@ in another repo: event subscriber
- [ ] use Stretto instead of Moka
- [ ] support alchemy_minedTransactions
- [ ] debug print of user::Model's address is a big vec of numbers. make that hex somehow
- [ ] should we combine the proxy and cli into one bin?

@ -71,3 +71,4 @@ ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.2.2"
itertools = "0.10.5"
glob = "0.3.0"

@ -1,4 +1,5 @@
// TODO: this file is way too big now. move things into other modules
mod ws;
use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use crate::block_number::block_needed;
@ -13,19 +14,17 @@ use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use axum::extract::ws::Message;
use axum::headers::{Origin, Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use hashbrown::HashMap;
use ipnet::IpNet;
use log::{debug, error, info, trace, warn};
use log::{debug, error, info, warn};
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
@ -38,13 +37,12 @@ use std::fmt;
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use ulid::Ulid;
// TODO: make this customizable?
@ -112,6 +110,7 @@ pub struct Web3ProxyApp {
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
// TODO: this key should be our RpcSecretKey class, not Ulid
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>,
@ -578,352 +577,6 @@ impl Web3ProxyApp {
.expect("prometheus metrics should always serialize")
}
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<Authorization>,
request_json: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
// TODO: this is not efficient
let request_bytes = serde_json::to_string(&request_json)
.context("finding request size")?
.len();
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap());
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: this only needs to be unique per connection. we don't need it globably unique
let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = U64::from(subscription_id);
// save the id so we can use it in the response
let id = request_json.id.clone();
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
match request_json.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let authorization = authorization.clone();
let head_block_receiver = self.head_block_receiver.clone();
let stat_sender = self.stat_sender.clone();
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: what should the payload for RequestMetadata be?
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.as_ref(),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newHeads)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
trace!("closed newHeads subscription. id={:?}", subscription_id);
});
}
Some(x) if x == &json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let authorization = authorization.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingFullTransactions subscription: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingFullTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingFullTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending transactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingRawTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingRawTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
}
_ => return Err(anyhow::anyhow!("unimplemented")),
}
// TODO: do something with subscription_join_handle?
let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_json.method.clone(),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed inside websocket: {:?}", err);
}
}
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
Ok((subscription_abort_handle, response))
}
/// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc(
self: &Arc<Self>,

367
web3_proxy/src/app/ws.rs Normal file

@ -0,0 +1,367 @@
//! Websocket-specific functions for the Web3ProxyApp
use super::{Web3ProxyApp, REQUEST_PERIOD};
use crate::app_stats::ProxyResponseStat;
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest;
use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use axum::extract::ws::Message;
use ethers::prelude::U64;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::stream::StreamExt;
use log::{trace, warn};
use serde_json::json;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
impl Web3ProxyApp {
// TODO: #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<Authorization>,
request_json: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
// TODO: this is not efficient
let request_bytes = serde_json::to_string(&request_json)
.context("finding request size")?
.len();
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, request_bytes).unwrap());
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
// TODO: this only needs to be unique per connection. we don't need it globably unique
let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = U64::from(subscription_id);
// save the id so we can use it in the response
let id = request_json.id.clone();
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
match request_json.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let authorization = authorization.clone();
let head_block_receiver = self.head_block_receiver.clone();
let stat_sender = self.stat_sender.clone();
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: what should the payload for RequestMetadata be?
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.as_ref(),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newHeads)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
trace!("closed newHeads subscription {:?}", subscription_id);
});
}
Some(x) if x == &json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let authorization = authorization.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingFullTransactions subscription: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingFullTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingFullTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let authorization = authorization.clone();
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let stat_sender = self.stat_sender.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending transactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
if let Some(stat_sender) = stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
"eth_subscription(newPendingRawTransactions)".to_string(),
authorization.clone(),
request_metadata.clone(),
response_bytes,
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!(
"stat_sender failed inside newPendingRawTransactions: {:?}",
err
);
}
}
}
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
}
_ => return Err(anyhow::anyhow!("unimplemented")),
}
// TODO: do something with subscription_join_handle?
let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id);
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_json.method.clone(),
authorization.clone(),
request_metadata,
response.num_bytes(),
);
if let Err(err) = stat_sender.send_async(response_stat.into()).await {
// TODO: what should we do?
warn!("stat_sender failed inside websocket: {:?}", err);
}
}
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
Ok((subscription_abort_handle, response))
}
}

@ -4,7 +4,7 @@ use std::fs;
use web3_proxy::config::TopConfig;
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Second subcommand.
/// Check the config for any problems.
#[argh(subcommand, name = "check_config")]
pub struct CheckConfigSubCommand {
#[argh(positional)]

@ -0,0 +1 @@
//! TODO: write this

@ -2,6 +2,9 @@ mod change_user_tier_by_key;
mod check_config;
mod create_user;
mod drop_migration_lock;
mod list_user_tier;
mod user_export;
mod user_import;
use argh::FromArgs;
use std::fs;
@ -36,9 +39,10 @@ enum SubCommand {
CheckConfig(check_config::CheckConfigSubCommand),
CreateUser(create_user::CreateUserSubCommand),
DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand),
// TODO: sub command to downgrade migrations?
// TODO: sub command to add new api keys to an existing user?
// TODO: sub command to change a user's tier
UserExport(user_export::UserExportSubCommand),
UserImport(user_import::UserImportSubCommand), // TODO: sub command to downgrade migrations?
// TODO: sub command to add new api keys to an existing user?
// TODO: sub command to change a user's tier
}
#[tokio::main]
@ -83,8 +87,19 @@ async fn main() -> anyhow::Result<()> {
x.main(&db_conn).await
}
SubCommand::DropMigrationLock(x) => {
// very intentionally, do NOT run migrations here
let db_conn = get_db(cli_config.db_url, 1, 1).await?;
x.main(&db_conn).await
}
SubCommand::UserExport(x) => {
let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?;
x.main(&db_conn).await
}
SubCommand::UserImport(x) => {
let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?;
x.main(&db_conn).await
}
}

@ -0,0 +1,76 @@
use argh::FromArgs;
use entities::{rpc_key, user};
use log::info;
use migration::sea_orm::{DatabaseConnection, EntityTrait, PaginatorTrait};
use std::fs::{self, create_dir_all};
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Export users from the database.
#[argh(subcommand, name = "user_export")]
pub struct UserExportSubCommand {
/// where to write the file
/// TODO: validate this is a valid path here?
#[argh(positional, default = "\"./data/users\".to_string()")]
output_dir: String,
}
impl UserExportSubCommand {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
// create the output dir if it does not exist
create_dir_all(&self.output_dir)?;
let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
let export_dir = Path::new(&self.output_dir);
// get all the users from the database (paged)
let mut user_pages = user::Entity::find().into_json().paginate(db_conn, 1000);
// TODO: for now all user_tier tables match in all databases, but in the future we might need to export/import this
// save all users to a file
let mut user_file_count = 0;
while let Some(users) = user_pages.fetch_and_next().await? {
let export_file = export_dir.join(format!("{}-users-{}.json", now, user_file_count));
fs::write(
export_file,
serde_json::to_string_pretty(&users).expect("users should serialize"),
)?;
user_file_count += 1;
}
info!(
"Saved {} user file(s) to {}",
user_file_count,
export_dir.to_string_lossy()
);
// get all the rpc keys from the database (paged)
let mut rpc_key_pages = rpc_key::Entity::find().into_json().paginate(db_conn, 1000);
let mut rpc_key_file_count = 0;
while let Some(rpc_keys) = rpc_key_pages.fetch_and_next().await? {
let export_file =
export_dir.join(format!("{}-rpc_keys-{}.json", now, rpc_key_file_count));
fs::write(
export_file,
serde_json::to_string_pretty(&rpc_keys).expect("rpc_keys should serialize"),
)?;
rpc_key_file_count += 1;
}
info!(
"Saved {} rpc key file(s) to {}",
rpc_key_file_count,
export_dir.to_string_lossy()
);
Ok(())
}
}

@ -0,0 +1,136 @@
use argh::FromArgs;
use glob::glob;
use hashbrown::HashMap;
use log::{info, warn};
use migration::sea_orm::DatabaseConnection;
use std::path::{Path, PathBuf};
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Import users from another database.
#[argh(subcommand, name = "user_import")]
pub struct UserImportSubCommand {
#[argh(positional)]
export_timestamp: u64,
#[argh(positional, default = "\"./data/users\".to_string()")]
/// where to write the file
/// TODO: validate this is a file here?
input_dir: String,
}
/// Map ids in the export to ids in our database.
type UserMap = HashMap<u64, u64>;
impl UserImportSubCommand {
pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> {
let import_dir = Path::new(&self.input_dir);
if !import_dir.exists() {
return Err(anyhow::anyhow!(
"import dir ({}) does not exist!",
import_dir.to_string_lossy()
));
}
let user_glob_path = import_dir.join(format!("{}-users-*.json", self.export_timestamp));
let user_glob_path = user_glob_path.to_string_lossy();
info!("Scanning {}", user_glob_path);
let mut user_map = HashMap::new();
let mut user_file_count = 0;
let mut imported_user_count = 0;
for entry in glob(&user_glob_path)? {
match entry {
Ok(path) => {
imported_user_count +=
self.import_user_file(db_conn, path, &mut user_map).await?
}
Err(e) => {
warn!(
"imported {} users from {} files.",
imported_user_count, user_file_count
);
return Err(e.into());
}
}
user_file_count += 1;
}
info!(
"Imported {} user(s) from {} file(s). {} users mapped.",
imported_user_count,
user_file_count,
user_map.len()
);
let rpc_key_glob_path =
import_dir.join(format!("{}-rpc_keys-*.json", self.export_timestamp));
let rpc_key_glob_path = rpc_key_glob_path.to_string_lossy();
info!("Scanning {}", rpc_key_glob_path);
let mut rpc_key_file_count = 0;
let mut imported_rpc_key_count = 0;
for entry in glob(&rpc_key_glob_path)? {
match entry {
Ok(path) => {
imported_rpc_key_count +=
self.import_rpc_key_file(db_conn, path, &user_map).await?
}
Err(e) => {
warn!(
"imported {} users from {} files.",
imported_rpc_key_count, rpc_key_file_count
);
return Err(e.into());
}
}
rpc_key_file_count += 1;
}
info!(
"Imported {} rpc key(s) from {} file(s)",
imported_rpc_key_count, rpc_key_file_count
);
Ok(())
}
pub async fn import_user_file(
&self,
db_conn: &DatabaseConnection,
path: PathBuf,
user_map: &mut UserMap,
) -> anyhow::Result<u64> {
let count = 0;
// TODO: do this all inside a database transaction?
// for each file in the path, read as json
// -- for each entry in the json
// ---- let user_id = if user is in the database
// ------ add user to the database
// ---- else
// ------ add user to the database
// ---- add user to the map.
todo!()
}
pub async fn import_rpc_key_file(
&self,
db_conn: &DatabaseConnection,
path: PathBuf,
user_map: &UserMap,
) -> anyhow::Result<u64> {
let count = 0;
// TODO: do this all inside a database transaction?
// for each file in the path, read as json
// -- for each entry in the json
// ---- let rpc_key_id = if rpc_key is in the database
// ------ continue
// ---- else
// ------ add rpc_key to the database
todo!()
}
}