load the redirected page from config

This commit is contained in:
Bryan Stitt 2022-08-12 19:07:14 +00:00
parent a04629641c
commit c57c2249c0
12 changed files with 140 additions and 50 deletions

65
Cargo.lock generated

@ -2162,6 +2162,20 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "handlebars"
version = "4.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360d9740069b2f6cbb63ce2dbaa71a20d3185350cbb990d7bebeb9318415eb17"
dependencies = [
"log",
"pest",
"pest_derive",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -3151,6 +3165,50 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pest"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69486e2b8c2d2aeb9762db7b4e00b0331156393555cff467f4163ff06821eef8"
dependencies = [
"thiserror",
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13570633aff33c6d22ce47dd566b10a3b9122c2fe9d8e7501895905be532b91"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3c567e5702efdc79fb18859ea74c3eb36e14c43da7b8c1f098a4ed6514ec7a0"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pest_meta"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eb32be5ee3bbdafa8c7a18b0a8a8d962b66cfa2ceee4037f49267a50ee821fe"
dependencies = [
"once_cell",
"pest",
"sha-1",
]
[[package]]
name = "petgraph"
version = "0.6.2"
@ -4947,6 +5005,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "ucd-trie"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89570599c4fe5585de2b388aab47e99f7fa4e9238a1399f707a02e356058141c"
[[package]]
name = "uint"
version = "0.9.3"
@ -5220,6 +5284,7 @@ dependencies = [
"flume",
"fstrings",
"futures",
"handlebars",
"hashbrown",
"indexmap",
"migration",

@ -107,6 +107,6 @@ Test erigon (assuming it is on 8945):
Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another.
Run ethspam for a more realistic load test:
Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test:
ethspam --rpc http://127.0.0.1:8544/u/someuserkey | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/someuserkey

@ -70,9 +70,9 @@
- [x] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early
- [x] we were sorting best block the wrong direction. i flipped a.cmp(b) to b.cmp(a) so that the largest would be first, but then i used 'max_by' which looks at the end of the list
- [x] HTTP GET to the websocket endpoints should redirect instead of giving an ugly error
- [ ] load the redirected page from config
- [ ] use config to decide if the app should use `into_make_service_with_connect_info` or `into_make_service`. with_connect_info is needed for getting the ip from docker
- [x] load the redirected page from config
- [ ] basic request method stats
- [ ] prettier output for create_user command. need the key in hex
- [ ] use siwe messages and signatures for sign up and login
- [ ] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60
- [ ] i think the server isn't following the spec. we need a context attached to this error so we know which one
@ -247,3 +247,4 @@ in another repo: event subscriber
eth_1 | 2022-08-10T23:26:10.195014Z WARN web3_proxy::connections: chain is forked! 262 possible heads. 1/2/5/5 rpcs have 0x0538…bfff
eth_1 | 2022-08-10T23:26:10.195658Z WARN web3_proxy::connections: chain is forked! 262 possible heads. 2/3/5/5 rpcs have 0x0538…bfff
- [ ] disable redis persistence in dev
- [ ] fix ip detection when running in dev

@ -1,4 +1,4 @@
[shared]
[app]
chain_id = 1
public_rate_limit_per_minute = 60_000

@ -2,6 +2,8 @@
chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
redis_url = "redis://dev-redis:6379/"
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
public_rate_limit_per_minute = 0
# 1GB of cache
response_cache_max_bytes = 10000000000

@ -42,6 +42,7 @@ rand = "0.8.5"
# TODO: regex has several "perf" features that we might want to use
regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
handlebars = "4.3.3"
rustc-hash = "1.1.0"
siwe = "0.4.1"
sea-orm = { version = "0.9.1", features = ["macros"] }

@ -34,7 +34,7 @@ use uuid::Uuid;
use crate::bb8_helpers;
use crate::block_helpers::block_needed;
use crate::config::AppConfig;
use crate::config::{AppConfig, TopConfig};
use crate::connections::Web3Connections;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
@ -139,6 +139,7 @@ pub struct Web3ProxyApp {
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxState>,
pub config: AppConfig,
pub pending_transactions: Arc<DashMap<TxHash, TxState>>,
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
pub redis_pool: Option<RedisPool>,
@ -156,17 +157,23 @@ impl fmt::Debug for Web3ProxyApp {
impl Web3ProxyApp {
// TODO: should we just take the rpc config as the only arg instead?
pub async fn spawn(
app_config: AppConfig,
top_config: TopConfig,
num_workers: usize,
) -> anyhow::Result<(
Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
// safety checks on the config
assert!(
top_config.app.redirect_user_url.contains("{{user_id}}"),
"redirect user url must contain \"{{user_id}}\""
);
// first, we connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = app_config.shared.db_url {
let db_conn = if let Some(db_url) = &top_config.app.db_url {
let max_connections = num_workers.try_into()?;
let db = get_migrated_db(db_url, max_connections).await?;
let db = get_migrated_db(db_url.clone(), max_connections).await?;
Some(db)
} else {
@ -174,9 +181,9 @@ impl Web3ProxyApp {
None
};
let balanced_rpcs = app_config.balanced_rpcs;
let balanced_rpcs = top_config.balanced_rpcs;
let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs {
let private_rpcs = if let Some(private_rpcs) = top_config.private_rpcs {
private_rpcs
} else {
Default::default()
@ -196,11 +203,11 @@ impl Web3ProxyApp {
.build()?,
);
let redis_pool = match app_config.shared.redis_url {
let redis_pool = match top_config.app.redis_url.as_ref() {
Some(redis_url) => {
info!("Connecting to redis on {}", redis_url);
let manager = RedisConnectionManager::new(redis_url)?;
let manager = RedisConnectionManager::new(redis_url.as_ref())?;
let min_size = num_workers as u32;
let max_size = min_size * 4;
@ -236,9 +243,8 @@ impl Web3ProxyApp {
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
// TODO: attach context to this error
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
app_config.shared.chain_id,
top_config.app.chain_id,
balanced_rpcs,
http_client.clone(),
redis_pool.clone(),
@ -257,7 +263,7 @@ impl Web3ProxyApp {
} else {
// TODO: attach context to this error
let (private_rpcs, private_handle) = Web3Connections::spawn(
app_config.shared.chain_id,
top_config.app.chain_id,
private_rpcs,
http_client.clone(),
redis_pool.clone(),
@ -276,7 +282,8 @@ impl Web3ProxyApp {
};
// TODO: how much should we allow?
let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3;
// TODO: im seeing errors in redis. just use the redis FAQ on rate limiting. its really simple
let public_max_burst = top_config.app.public_rate_limit_per_minute / 3;
let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| {
RedisCell::new(
@ -284,20 +291,21 @@ impl Web3ProxyApp {
"web3_proxy",
"frontend",
public_max_burst,
app_config.shared.public_rate_limit_per_minute,
top_config.app.public_rate_limit_per_minute,
60,
)
});
// keep the borrow checker happy
let response_cache_max_bytes = top_config.app.response_cache_max_bytes;
let app = Self {
config: top_config.app,
balanced_rpcs,
private_rpcs,
active_requests: Default::default(),
// TODO: make the share configurable
response_cache: RwLock::new(FifoSizedMap::new(
app_config.shared.response_cache_max_bytes,
100,
)),
response_cache: RwLock::new(FifoSizedMap::new(response_cache_max_bytes, 100)),
head_block_receiver,
pending_tx_sender,
pending_transactions,

@ -17,15 +17,15 @@ use tokio::time::Duration;
use tracing::{debug, info};
use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, Web3ProxyApp};
use web3_proxy::config::{AppConfig, CliConfig};
use web3_proxy::config::{CliConfig, TopConfig};
use web3_proxy::frontend;
fn run(
shutdown_receiver: flume::Receiver<()>,
cli_config: CliConfig,
app_config: AppConfig,
top_config: TopConfig,
) -> anyhow::Result<()> {
debug!(?cli_config, ?app_config);
debug!(?cli_config, ?top_config);
// spawn a thread for deadlock detection
thread::spawn(move || loop {
@ -48,7 +48,7 @@ fn run(
// set up tokio's async runtime
let mut rt_builder = runtime::Builder::new_multi_thread();
let chain_id = app_config.shared.chain_id;
let chain_id = top_config.app.chain_id;
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
@ -70,7 +70,7 @@ fn run(
debug!(?num_workers);
rt.block_on(async {
let (app, app_handle) = Web3ProxyApp::spawn(app_config, num_workers).await?;
let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?;
let frontend_handle = tokio::spawn(frontend::serve(cli_config.port, app));
@ -128,15 +128,17 @@ fn main() -> anyhow::Result<()> {
// advanced configuration is on disk
info!("Loading rpc config @ {}", cli_config.config);
let app_config: String = fs::read_to_string(cli_config.config.clone())?;
let app_config: AppConfig = toml::from_str(&app_config)?;
let top_config: String = fs::read_to_string(cli_config.config.clone())?;
let top_config: TopConfig = toml::from_str(&top_config)?;
// TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3_proxy-{}", app_config.shared.chain_id));
proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id));
// tokio has code for catching ctrl+c so we use that
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
let (_shutdown_sender, shutdown_receiver) = flume::bounded(1);
run(shutdown_receiver, cli_config, app_config)
run(shutdown_receiver, cli_config, top_config)
}
#[cfg(test)]
@ -148,7 +150,7 @@ mod tests {
use hashbrown::HashMap;
use std::env;
use web3_proxy::config::{RpcSharedConfig, Web3ConnectionConfig};
use web3_proxy::config::{AppConfig, Web3ConnectionConfig};
use super::*;
@ -190,13 +192,15 @@ mod tests {
};
// make a test AppConfig
let app_config = AppConfig {
shared: RpcSharedConfig {
let app_config = TopConfig {
app: AppConfig {
chain_id: 31337,
db_url: None,
redis_url: None,
public_rate_limit_per_minute: 0,
response_cache_max_bytes: 10_usize.pow(7),
redirect_public_url: "example.com/".to_string(),
redirect_user_url: "example.com/users/{user_address}".to_string(),
},
balanced_rpcs: HashMap::from([
(

@ -28,15 +28,15 @@ pub struct CliConfig {
}
#[derive(Debug, Deserialize)]
pub struct AppConfig {
pub shared: RpcSharedConfig,
pub struct TopConfig {
pub app: AppConfig,
pub balanced_rpcs: HashMap<String, Web3ConnectionConfig>,
pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>,
}
/// shared configuration between Web3Connections
#[derive(Debug, Deserialize)]
pub struct RpcSharedConfig {
pub struct AppConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64,
pub db_url: Option<String>,
@ -45,6 +45,10 @@ pub struct RpcSharedConfig {
pub public_rate_limit_per_minute: u32,
#[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize,
/// the stats page url for an anonymous user.
pub redirect_public_url: String,
/// the stats page url for a logged in user. it must contain "{user_id}"
pub redirect_user_url: String,
}
fn default_public_rate_limit_per_minute() -> u32 {

@ -182,6 +182,7 @@ impl Web3Connections {
};
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let spawn_handles: Vec<_> = server_configs
.into_iter()
.map(|(server_name, server_config)| {

@ -106,6 +106,7 @@ impl Web3ProxyApp {
UserId,
RequestsPerMinute,
}
// TODO: join the user table to this to return the User? we don't always need it
let user_data = match user_keys::Entity::find()
.select_only()
.column_as(user_keys::Column::UserId, QueryAs::UserId)

@ -5,12 +5,12 @@ use axum::{
Extension,
};
use axum_client_ip::ClientIp;
use fstrings::{format_args_f, format_f};
use futures::SinkExt;
use futures::{
future::AbortHandle,
stream::{SplitSink, SplitStream, StreamExt},
};
use handlebars::Handlebars;
use hashbrown::HashMap;
use serde_json::{json, value::RawValue};
use std::sync::Arc;
@ -30,7 +30,7 @@ pub async fn public_websocket_handler(
ClientIp(ip): ClientIp,
ws_upgrade: Option<WebSocketUpgrade>,
) -> Response {
let ip = match app.rate_limit_by_ip(ip).await {
let _ip = match app.rate_limit_by_ip(ip).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedIp(x)) => x,
Err(err_response) => return err_response,
@ -44,12 +44,8 @@ pub async fn public_websocket_handler(
.on_upgrade(|socket| proxy_web3_socket(app, socket))
.into_response(),
None => {
// this is not a websocket. give a friendly page. maybe redirect to the llama nodes home
// TODO: redirect to a configurable url
Redirect::to(&format_f!(
"https://llamanodes.com/free-rpc-stats#userip={ip}"
))
.into_response()
// this is not a websocket. redirect to a friendly page
Redirect::to(&app.config.redirect_public_url).into_response()
}
}
}
@ -72,13 +68,20 @@ pub async fn user_websocket_handler(
match ws_upgrade {
Some(ws_upgrade) => ws_upgrade.on_upgrade(|socket| proxy_web3_socket(app, socket)),
None => {
// this is not a websocket. give a friendly page with stats for this user
// TODO: redirect to a configurable url
// TODO: store this on the app and use register_template?
let reg = Handlebars::new();
// TODO: show the user's address, not their id (remember to update the checks for {{user_id}} in app.rs)
// TODO: query to get the user's address. expose that instead of user_id
Redirect::to(&format_f!(
"https://llamanodes.com/user-rpc-stats#user_id={user_id}"
))
.into_response()
let user_url = reg
.render_template(
&app.config.redirect_user_url,
&json!({ "user_id": user_id }),
)
.unwrap();
// this is not a websocket. redirect to a page for this user
Redirect::to(&user_url).into_response()
}
}
}