stats in redis that actually work

we should still investigate a real time series db, but stats in redis is much simpler for now
This commit is contained in:
Bryan Stitt 2022-10-07 02:15:53 +00:00
parent 394f6a6d84
commit c8fbf46885
13 changed files with 256 additions and 231 deletions

145
Cargo.lock generated

@ -74,15 +74,6 @@ dependencies = [
"libc",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.65"
@ -630,7 +621,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding 0.1.5",
"block-padding",
"byte-tools",
"byteorder",
"generic-array 0.12.4",
@ -642,7 +633,6 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"block-padding 0.2.1",
"generic-array 0.14.5",
]
@ -664,12 +654,6 @@ dependencies = [
"byte-tools",
]
[[package]]
name = "block-padding"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "blocking"
version = "1.2.0"
@ -941,7 +925,7 @@ dependencies = [
"serde",
"serde_derive",
"sha2 0.10.2",
"sha3 0.10.1",
"sha3",
"thiserror",
]
@ -1477,7 +1461,7 @@ dependencies = [
"serde",
"serde_json",
"sha2 0.10.2",
"sha3 0.10.1",
"sha3",
"thiserror",
"uuid 0.8.2",
]
@ -1494,7 +1478,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"sha3 0.10.1",
"sha3",
"thiserror",
"uint",
]
@ -2182,9 +2166,9 @@ dependencies = [
[[package]]
name = "handlebars"
version = "4.3.4"
version = "4.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56b224eaa4987c03c30b251de7ef0c15a6a59f34222905850dbc3026dfb24d5f"
checksum = "433e4ab33f1213cdc25b5fa45c76881240cfe79284cf2b395e8b9e312a30a2fd"
dependencies = [
"log",
"pest",
@ -2479,35 +2463,6 @@ dependencies = [
"regex",
]
[[package]]
name = "influxdb"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39023407f0546c3b30607950f8b600c7db4ef7621fbaa0159de733d73e68b23f"
dependencies = [
"chrono",
"futures-util",
"http",
"influxdb_derive",
"lazy_static",
"regex",
"reqwest",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "influxdb_derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d81efbf97cec06c647f05a8b5edcbc52434cdf980d8d4ace68e1028c90241d3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "inotify"
version = "0.9.6"
@ -2548,11 +2503,12 @@ checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"
[[package]]
name = "iri-string"
version = "0.4.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78"
checksum = "6d0586ad318a04c73acdbad33f67969519b5452c80770c4c72059a686da48a7e"
dependencies = [
"nom",
"memchr",
"serde",
]
[[package]]
@ -2604,7 +2560,7 @@ dependencies = [
"ecdsa",
"elliptic-curve",
"sha2 0.10.2",
"sha3 0.10.1",
"sha3",
]
[[package]]
@ -2898,6 +2854,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.0"
@ -3074,6 +3040,12 @@ dependencies = [
"syn",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parity-scale-codec"
version = "3.1.2"
@ -3457,7 +3429,7 @@ dependencies = [
"postgres-protocol",
"serde",
"serde_json",
"time 0.3.14",
"time 0.3.15",
"uuid 1.1.2",
]
@ -4101,7 +4073,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"time 0.3.14",
"time 0.3.15",
"tracing",
"url",
"uuid 1.1.2",
@ -4165,7 +4137,7 @@ dependencies = [
"sea-query-derive",
"sea-query-driver",
"serde_json",
"time 0.3.14",
"time 0.3.15",
"uuid 1.1.2",
]
@ -4395,21 +4367,9 @@ dependencies = [
[[package]]
name = "sha3"
version = "0.9.1"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809"
dependencies = [
"block-buffer 0.9.0",
"digest 0.9.0",
"keccak",
"opaque-debug 0.3.0",
]
[[package]]
name = "sha3"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "881bf8156c87b6301fc5ca6b27f11eeb2761224c7081e69b409d5a1951a70c86"
checksum = "eaedf34ed289ea47c2b741bb72e5357a209512d67bcd4bda44359e5bf0470f56"
dependencies = [
"digest 0.10.3",
"keccak",
@ -4451,18 +4411,18 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "siwe"
version = "0.4.2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f24fe2b646c33a670e7d79a232bffb41821fed28b1870a8bd1a47e6ae686ace6"
checksum = "ec4cc2eafb2354c1aeaeac5f53b7726ca4ea5ad16314a0b5ebacb6676f599c2b"
dependencies = [
"hex",
"http",
"iri-string",
"k256",
"rand 0.8.5",
"sha3 0.9.1",
"sha3",
"thiserror",
"time 0.3.14",
"time 0.3.15",
]
[[package]]
@ -4491,9 +4451,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.8.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "snafu"
@ -4642,7 +4602,7 @@ dependencies = [
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.3.14",
"time 0.3.15",
"tokio-stream",
"url",
"uuid 1.1.2",
@ -4922,9 +4882,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b"
checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c"
dependencies = [
"itoa 1.0.2",
"libc",
@ -5132,9 +5092,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.36"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"log",
@ -5145,9 +5105,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
dependencies = [
"proc-macro2",
"quote",
@ -5156,9 +5116,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.29"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
"valuable",
@ -5187,12 +5147,12 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.15"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"ansi_term",
"matchers",
"nu-ansi-term",
"once_cell",
"parking_lot 0.12.1",
"regex",
@ -5554,7 +5514,6 @@ dependencies = [
"handlebars",
"hashbrown",
"http",
"influxdb",
"ipnet",
"metered",
"migration",
@ -5575,7 +5534,7 @@ dependencies = [
"serde_json",
"serde_prometheus",
"siwe",
"time 0.3.14",
"time 0.3.15",
"tokio",
"tokio-stream",
"toml",
@ -5749,7 +5708,7 @@ dependencies = [
"hmac",
"pbkdf2 0.10.1",
"sha1",
"time 0.3.14",
"time 0.3.15",
"zstd",
]

16
TODO.md

@ -174,10 +174,16 @@ These are roughly in order of completition
- [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet.
- [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high?
- [x] did this by checking a key/ip-specific semaphore before checking rate limits
- [ ] collect requests per second per api key
- [ ] collect concurrent requests per api key
- [ ] collect distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [ ] display key stats on an endpoint that requires authentication
- [x] emit stat on cache hit
- [x] emit stat on cache miss
- [ ] add grafana to dev docker-compose so we can browse stats
- [ ] emit stat on retry
- [ ] emit stat on no servers synced
- [ ] emit stat on error (maybe just use sentry, but graphs are handy)
- if we wait until the error handler to emit the stat, i don't think we have access to the authorized_request
- [ ] display requests per second per api key (only with authentication!)
- [ ] display concurrent requests per api key (only with authentication!)
- [ ] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!)
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
@ -199,6 +205,8 @@ These are roughly in order of completition
These are not yet ordered.
- [ ] EIP1271 for siwe
- [ ] Limited throughput during high traffic
- [ ] implement filters and other unimplemented rpc methods
- multiple teams need log filters and subscriptions.
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead

@ -1,17 +1,25 @@
[shared]
chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
# TODO: how do we find the optimal db_max_connections? too high actually ends up being slower
db_max_connections = 99
# TODO: influxdb_url = "http://influxdb:8086"
min_sum_soft_limit = 2000
min_synced_rpcs = 2
redis_url = "redis://dev-redis:6379/"
# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower
redis_max_connections = 300
persistent_redis_max_connections = 300
persistent_redis_url = "redis://dev-predis:6379/"
volatile_redis_max_connections = 300
volatile_redis_url = "redis://dev-vredis:6379/"
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
public_rate_limit_per_minute = 0
# 1GB of cache
response_cache_max_bytes = 10000000000

@ -11,4 +11,4 @@ anyhow = "1.0.65"
hashbrown = "0.12.3"
moka = { version = "0.9.4", default-features = false, features = ["future"] }
tokio = "1.21.2"
tracing = "0.1.36"
tracing = "0.1.37"

@ -11,6 +11,11 @@ services:
#RUST_LOG: "info,web3_proxy=debug"
RUST_LOG: info
volatile_redis:
image: redis
persistent_redis:
image: redis:6.0-alpine
command: [ "redis-server", "--save", "", "--appendonly", "no" ]
# be sure to mount /data!
volatile_redis:
image: redis:6.0-alpine
command: [ "redis-server", "--save", "60", "1" ]

@ -3,13 +3,16 @@
version: "3.4"
services:
dev-redis:
extends:
file: docker-compose.common.yml
service: volatile_redis
# manage the databases with a user friendly interface
# it is slightly dangerous with "drop all" as a single click
dev-adminer:
image: adminer
ports:
- 127.0.0.1:16379:6379
- 18306:8080
environment:
ADMINER_DEFAULT_SERVER: dev-db
# in dev we use mysql, but production will use RDS or similar
dev-db:
image: mysql
environment:
@ -20,35 +23,30 @@ services:
volumes:
- ./data/dev_mysql:/var/lib/mysql
dev-influxdb:
image: influxdb:latest
ports:
- '127.0.0.1:18086:8086'
volumes:
- ./data/dev_influxdb:/var/lib/influxdb
environment:
- INFLUXDB_DB=db0
- INFLUXDB_ADMIN_USER=admin
- INFLUXDB_ADMIN_PASSWORD=dev_web3_proxy
dev-otel-collector:
image: otel/opentelemetry-collector-dev:latest
expose:
- 4317
dev-adminer:
image: adminer
ports:
- 18306:8080
environment:
ADMINER_DEFAULT_SERVER: dev-db
dev-eth:
# persistent redis for storing user stats
# TODO: replace this with a real time series database
dev-predis:
extends:
file: docker-compose.common.yml
service: web3-proxy
volumes:
- ./config/example.toml:/config.toml
service: persistent_redis
ports:
- 8544:8544 # proxy (should be behind something handling HTTPS)
- 8543:8543 # prometheus
- 16379:6379
volumes:
- ./data/dev_predis:/data
# volatile redis for storing rate limits
dev-vredis:
extends:
file: docker-compose.common.yml
service: volatile_redis
ports:
- 16380:6379
# dev-eth:
# extends:
# file: docker-compose.common.yml
# service: web3-proxy
# volumes:
# - ./config/example.toml:/config.toml
# ports:
# - 8544:8544 # proxy (should be behind something handling HTTPS)
# - 8543:8543 # prometheus

@ -7,5 +7,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0.65"
deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] }
tracing = "0.1.36"
tracing = "0.1.37"
tokio = "1.21.2"

@ -36,7 +36,6 @@ flume = "0.10.14"
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
influxdb = { version = "0.5.2", features = ["derive"] }
ipnet = "2.5.0"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.4", default-features = false, features = ["future"] }
@ -51,15 +50,15 @@ rand = "0.8.5"
# TODO: regex has several "perf" features that we might want to use
regex = "1.6.0"
reqwest = { version = "0.11.12", default-features = false, features = ["json", "tokio-rustls"] }
handlebars = "4.3.4"
handlebars = "4.3.5"
rustc-hash = "1.1.0"
siwe = "0.4.2"
siwe = "0.5.0"
sea-orm = { version = "0.9.3", features = ["macros"] }
serde = { version = "1.0.145", features = [] }
serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.14"
time = "0.3.15"
tokio = { version = "1.21.2", features = ["full", "tracing"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.10", features = ["sync"] }
@ -68,9 +67,9 @@ tower = "0.4.13"
# TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid?
tower-request-id = "0.2.0"
tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers", "trace"] }
tracing = "0.1.36"
tracing = "0.1.37"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] }
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.1.2"

@ -109,7 +109,7 @@ pub struct Web3ProxyApp {
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub redis_pool: Option<RedisPool>,
pub vredis_pool: Option<RedisPool>,
pub user_key_cache: Cache<Ulid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
pub user_key_semaphores: Cache<u64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
@ -229,15 +229,15 @@ impl Web3ProxyApp {
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
let redis_pool = match top_config.app.redis_url.as_ref() {
let vredis_pool = match top_config.app.volatile_redis_url.as_ref() {
Some(redis_url) => {
// TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to redis");
info!("Connecting to vredis");
// TODO: what is a good default?
let redis_max_connections = top_config
.app
.redis_max_connections
.volatile_redis_max_connections
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts?
@ -251,37 +251,67 @@ impl Web3ProxyApp {
if let Err(err) = redis_pool.get().await {
error!(
?err,
"failed to connect to redis. some features will be disabled"
"failed to connect to vredis. some features will be disabled"
);
};
Some(redis_pool)
}
None => {
warn!("no redis connection");
warn!("no redis connection. some features will be disabled");
None
}
};
// setup a channel here for receiving influxdb stats
// TODO: dry this with predis
let predis_pool = match top_config.app.persistent_redis_url.as_ref() {
Some(redis_url) => {
// TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to predis");
// TODO: what is a good default?
let redis_max_connections = top_config
.app
.persistent_redis_max_connections
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts?
let redis_pool = RedisConfig::from_url(redis_url)
.builder()?
.max_size(redis_max_connections)
.runtime(DeadpoolRuntime::Tokio1)
.build()?;
// test the redis pool
if let Err(err) = redis_pool.get().await {
error!(
?err,
"failed to connect to vredis. some features will be disabled"
);
};
Some(redis_pool)
}
None => {
warn!("no predis connection. some features will be disabled");
None
}
};
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
// we do this in a channel so we don't slow down our response to the users
// TODO: make influxdb optional
let stat_sender = if let Some(influxdb_url) = top_config.app.influxdb_url.clone() {
let influxdb_name = top_config
.app
.influxdb_name
.clone()
.context("connecting to influxdb")?;
let stat_sender = if let Some(redis_pool) = predis_pool.clone() {
let redis_conn = redis_pool.get().await?;
// TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats
let (stat_sender, stat_handle) =
StatEmitter::spawn(influxdb_url, influxdb_name, http_client.clone());
StatEmitter::spawn(top_config.app.chain_id, redis_conn).await?;
handles.push(stat_handle);
Some(stat_sender)
} else {
warn!("no influxdb connection");
warn!("cannot store stats without a redis connection");
None
};
@ -317,7 +347,7 @@ impl Web3ProxyApp {
top_config.app.chain_id,
balanced_rpcs,
http_client.clone(),
redis_pool.clone(),
vredis_pool.clone(),
block_map.clone(),
Some(head_block_sender),
top_config.app.min_sum_soft_limit,
@ -343,7 +373,7 @@ impl Web3ProxyApp {
top_config.app.chain_id,
private_rpcs,
http_client.clone(),
redis_pool.clone(),
vredis_pool.clone(),
block_map,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None,
@ -370,7 +400,7 @@ impl Web3ProxyApp {
let mut frontend_key_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = redis_pool.as_ref() {
if let Some(redis_pool) = vredis_pool.as_ref() {
let rpc_rrl = RedisRateLimiter::new(
"web3_proxy",
"frontend",
@ -440,7 +470,7 @@ impl Web3ProxyApp {
frontend_key_rate_limiter,
login_rate_limiter,
db_conn,
redis_pool,
vredis_pool,
app_metrics,
open_request_handle_metrics,
user_key_cache,
@ -740,7 +770,7 @@ impl Web3ProxyApp {
}
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.redis_pool.as_ref() {
match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;

@ -48,34 +48,49 @@ pub struct TopConfig {
// TODO: no String, only &str
#[derive(Debug, Default, Deserialize)]
pub struct AppConfig {
// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
/// EVM chain id. 1 for ETH
/// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294
pub chain_id: u64,
pub cookie_domain: Option<String>,
pub cookie_secure: Option<bool>,
/// Database is used for user data.
/// Currently supports mysql or compatible backend.
pub db_url: Option<String>,
/// minimum size of the connection pool for the database
/// If none, the number of workers are used
/// minimum size of the connection pool for the database.
/// If none, the number of workers are used.
pub db_min_connections: Option<u32>,
/// minimum size of the connection pool for the database
/// If none, the minimum * 2 is used
/// maximum size of the connection pool for the database.
/// If none, the minimum * 2 is used.
pub db_max_connections: Option<u32>,
pub influxdb_url: Option<String>,
pub influxdb_name: Option<String>,
/// Default request limit for registered users.
/// 0 = block all requests
/// None = allow all requests
pub default_requests_per_minute: Option<u64>,
/// Restrict user registration.
/// None = no code needed
pub invite_code: Option<String>,
/// The soft limit prevents thundering herds as new blocks are seen.
#[serde(default = "default_min_sum_soft_limit")]
pub min_sum_soft_limit: u32,
/// Another knob for preventing thundering herds as new blocks are seen.
#[serde(default = "default_min_synced_rpcs")]
pub min_synced_rpcs: usize,
/// Set to 0 to block all anonymous requests
/// Request limit for anonymous users.
/// Set to 0 to block all anonymous requests.
#[serde(default = "default_frontend_rate_limit_per_minute")]
pub frontend_rate_limit_per_minute: u64,
/// Rate limit for the login entrypoint.
/// This is separate from the rpc limits.
#[serde(default = "default_login_rate_limit_per_minute")]
pub login_rate_limit_per_minute: u64,
pub redis_url: Option<String>,
/// Persist user stats in a redis (or compatible backend)
/// TODO: research more time series databases
pub persistent_redis_url: Option<String>,
pub persistent_redis_max_connections: Option<usize>,
/// Track rate limits in a redis (or compatible backend)
pub volatile_redis_url: Option<String>,
/// maximum size of the connection pool for the cache
/// If none, the minimum * 2 is used
pub redis_max_connections: Option<usize>,
pub volatile_redis_max_connections: Option<usize>,
/// RPC responses are cached locally
#[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize,
/// the stats page url for an anonymous user.
@ -84,10 +99,12 @@ pub struct AppConfig {
pub redirect_user_url: String,
}
/// This might cause a thundering herd!
fn default_min_sum_soft_limit() -> u32 {
1
}
/// Only require 1 server. This might cause a thundering herd!
fn default_min_synced_rpcs() -> usize {
1
}
@ -110,10 +127,16 @@ fn default_response_cache_max_bytes() -> usize {
#[derive(Debug, Deserialize, Constructor)]
pub struct Web3ConnectionConfig {
/// websocket (or http if no websocket)
url: String,
/// the requests per second at which the server starts slowing down
soft_limit: u32,
/// the requests per second at which the server throws errors (rate limit or otherwise)
hard_limit: Option<u64>,
/// All else equal, a server with a lower weight receives requests
weight: u32,
/// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs
subscribe_txs: Option<bool>,
}

@ -206,7 +206,7 @@ impl Display for &AuthorizedRequest {
match self {
AuthorizedRequest::Internal => f.write_str("internal"),
AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("user_key:{}", x.user_key_id)),
AuthorizedRequest::User(_, x) => f.write_str(&format!("uk:{}", x.user_key_id)),
}
}
}

@ -26,7 +26,7 @@ use http::StatusCode;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use serde::{Deserialize, Serialize};
use siwe::Message;
use siwe::{Message, VerificationOpts};
use std::ops::Add;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
@ -168,8 +168,14 @@ pub async fn post_login(
let our_msg: siwe::Message = our_msg.parse().unwrap();
let verify_config = VerificationOpts {
domain: Some(our_msg.domain),
nonce: Some(our_msg.nonce),
..Default::default()
};
// check the domain and a nonce. let timestamp be automatic
if let Err(e) = their_msg.verify(their_sig, Some(&our_msg.domain), Some(&our_msg.nonce), None) {
if let Err(e) = their_msg.verify(&their_sig, &verify_config).await {
// message cannot be correctly authenticated
todo!("proper error message: {}", e)
}

@ -1,9 +1,9 @@
use chrono::{DateTime, Utc};
use anyhow::Context;
use derive_more::From;
use influxdb::Client;
use influxdb::InfluxDbWriteable;
use redis_rate_limiter::{redis, RedisConnection};
use std::fmt::Display;
use tokio::task::JoinHandle;
use tracing::{error, info, trace};
use tracing::{debug, error, info};
use crate::frontend::authorization::AuthorizedRequest;
@ -14,39 +14,29 @@ pub enum ProxyResponseType {
Error,
}
impl From<ProxyResponseType> for influxdb::Type {
fn from(x: ProxyResponseType) -> Self {
match x {
ProxyResponseType::CacheHit => "cache_hit".into(),
ProxyResponseType::CacheMiss => "cache_miss".into(),
ProxyResponseType::Error => "error".into(),
impl Display for ProxyResponseType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProxyResponseType::CacheHit => f.write_str("cache_hit"),
ProxyResponseType::CacheMiss => f.write_str("cache_miss"),
ProxyResponseType::Error => f.write_str("error"),
}
}
}
/// TODO: where should this be defined?
/// TODO: what should be fields and what should be tags. count is always 1 which feels wrong
#[derive(Debug, InfluxDbWriteable)]
pub struct ProxyResponseStat {
time: DateTime<Utc>,
count: u32,
#[influxdb(tag)]
method: String,
#[influxdb(tag)]
response_type: ProxyResponseType,
#[influxdb(tag)]
who: String,
}
#[derive(Debug)]
pub struct ProxyResponseStat(String);
/// A very basic stat that we store in redis.
/// This probably belongs in a true time series database like influxdb, but client
impl ProxyResponseStat {
pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self {
Self {
time: Utc::now(),
count: 1,
method,
response_type,
who: who.to_string(),
}
// TODO: what order?
// TODO: app specific prefix. need at least the chain id
let redis_key = format!("proxy_response:{}:{}:{}", method, response_type, who);
Self(redis_key)
}
}
@ -56,9 +46,9 @@ pub enum Web3ProxyStat {
}
impl Web3ProxyStat {
fn into_query(self) -> influxdb::WriteQuery {
fn into_redis_key(self, chain_id: u64) -> String {
match self {
Self::ProxyResponse(x) => x.into_query("proxy_response"),
Self::ProxyResponse(x) => format!("{}:{}", x.0, chain_id),
}
}
}
@ -66,31 +56,30 @@ impl Web3ProxyStat {
pub struct StatEmitter;
impl StatEmitter {
pub fn spawn(
influxdb_url: String,
influxdb_name: String,
http_client: Option<reqwest::Client>,
) -> (flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>) {
pub async fn spawn(
chain_id: u64,
mut redis_conn: RedisConnection,
) -> anyhow::Result<(flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>)> {
let (tx, rx) = flume::unbounded::<Web3ProxyStat>();
let client = Client::new(influxdb_url, influxdb_name);
// use an existing http client
let client = if let Some(http_client) = http_client {
client.with_http_client(http_client)
} else {
client
};
// simple future that reads the channel and emits stats
let f = async move {
while let Ok(x) = rx.recv_async().await {
let x = x.into_query();
// TODO: batch stats? spawn this?
trace!(?x, "emitting stat");
let x = x.into_redis_key(chain_id);
if let Err(err) = client.query(x).await {
error!(?err, "failed writing stat");
// TODO: now what?
// TODO: this is too loud. just doing it for dev
debug!(?x, "emitting stat");
// TODO: do this without the pipe?
if let Err(err) = redis::pipe()
.incr(&x, 1)
.query_async::<_, ()>(&mut redis_conn)
.await
.context("incrementing stat")
{
error!(?err, "emitting stat")
}
}
@ -101,6 +90,6 @@ impl StatEmitter {
let handle = tokio::spawn(f);
(tx, handle)
Ok((tx, handle))
}
}