From c8fbf46885bda89772fcaf2308d27d18b8c4e13c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 7 Oct 2022 02:15:53 +0000 Subject: [PATCH] stats in redis that actually work we should still investigate a real time series db, but stats in redis is much simpler for now --- Cargo.lock | 145 ++++++++--------------- TODO.md | 16 ++- config/example.toml | 14 ++- deferred-rate-limiter/Cargo.toml | 2 +- docker-compose.common.yml | 9 +- docker-compose.yml | 66 +++++------ redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 11 +- web3_proxy/src/app.rs | 72 +++++++---- web3_proxy/src/config.rs | 47 ++++++-- web3_proxy/src/frontend/authorization.rs | 2 +- web3_proxy/src/frontend/users.rs | 10 +- web3_proxy/src/stats.rs | 91 +++++++------- 13 files changed, 256 insertions(+), 231 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d4551f2..cdb1d7d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,15 +74,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - [[package]] name = "anyhow" version = "1.0.65" @@ -630,7 +621,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" dependencies = [ - "block-padding 0.1.5", + "block-padding", "byte-tools", "byteorder", "generic-array 0.12.4", @@ -642,7 +633,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "block-padding 0.2.1", "generic-array 0.14.5", ] @@ -664,12 +654,6 @@ dependencies = [ "byte-tools", ] -[[package]] -name = "block-padding" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" - [[package]] name = "blocking" version = "1.2.0" @@ -941,7 +925,7 @@ dependencies = [ "serde", "serde_derive", "sha2 0.10.2", - "sha3 0.10.1", + "sha3", "thiserror", ] @@ -1477,7 +1461,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.2", - "sha3 0.10.1", + "sha3", "thiserror", "uuid 0.8.2", ] @@ -1494,7 +1478,7 @@ dependencies = [ "regex", "serde", "serde_json", - "sha3 0.10.1", + "sha3", "thiserror", "uint", ] @@ -2182,9 +2166,9 @@ dependencies = [ [[package]] name = "handlebars" -version = "4.3.4" +version = "4.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56b224eaa4987c03c30b251de7ef0c15a6a59f34222905850dbc3026dfb24d5f" +checksum = "433e4ab33f1213cdc25b5fa45c76881240cfe79284cf2b395e8b9e312a30a2fd" dependencies = [ "log", "pest", @@ -2479,35 +2463,6 @@ dependencies = [ "regex", ] -[[package]] -name = "influxdb" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39023407f0546c3b30607950f8b600c7db4ef7621fbaa0159de733d73e68b23f" -dependencies = [ - "chrono", - "futures-util", - "http", - "influxdb_derive", - "lazy_static", - "regex", - "reqwest", - "serde", - "serde_json", - "thiserror", -] - -[[package]] -name = "influxdb_derive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d81efbf97cec06c647f05a8b5edcbc52434cdf980d8d4ace68e1028c90241d3" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "inotify" version = "0.9.6" @@ -2548,11 +2503,12 @@ checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" [[package]] name = "iri-string" -version = "0.4.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78" +checksum = "6d0586ad318a04c73acdbad33f67969519b5452c80770c4c72059a686da48a7e" dependencies = [ - "nom", + "memchr", + "serde", ] [[package]] @@ -2604,7 +2560,7 @@ dependencies = [ "ecdsa", "elliptic-curve", "sha2 0.10.2", - "sha3 0.10.1", + "sha3", ] [[package]] @@ -2898,6 +2854,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -3074,6 +3040,12 @@ dependencies = [ "syn", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parity-scale-codec" version = "3.1.2" @@ -3457,7 +3429,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", - "time 0.3.14", + "time 0.3.15", "uuid 1.1.2", ] @@ -4101,7 +4073,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "time 0.3.14", + "time 0.3.15", "tracing", "url", "uuid 1.1.2", @@ -4165,7 +4137,7 @@ dependencies = [ "sea-query-derive", "sea-query-driver", "serde_json", - "time 0.3.14", + "time 0.3.15", "uuid 1.1.2", ] @@ -4395,21 +4367,9 @@ dependencies = [ [[package]] name = "sha3" -version = "0.9.1" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809" -dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", - "keccak", - "opaque-debug 0.3.0", -] - -[[package]] -name = "sha3" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "881bf8156c87b6301fc5ca6b27f11eeb2761224c7081e69b409d5a1951a70c86" +checksum = "eaedf34ed289ea47c2b741bb72e5357a209512d67bcd4bda44359e5bf0470f56" dependencies = [ "digest 0.10.3", "keccak", @@ -4451,18 +4411,18 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" [[package]] name = "siwe" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f24fe2b646c33a670e7d79a232bffb41821fed28b1870a8bd1a47e6ae686ace6" +checksum = "ec4cc2eafb2354c1aeaeac5f53b7726ca4ea5ad16314a0b5ebacb6676f599c2b" dependencies = [ "hex", "http", "iri-string", "k256", "rand 0.8.5", - "sha3 0.9.1", + "sha3", "thiserror", - "time 0.3.14", + "time 0.3.15", ] [[package]] @@ -4491,9 +4451,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "snafu" @@ -4642,7 +4602,7 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", - "time 0.3.14", + "time 0.3.15", "tokio-stream", "url", "uuid 1.1.2", @@ -4922,9 +4882,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b" +checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c" dependencies = [ "itoa 1.0.2", "libc", @@ -5132,9 +5092,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", "log", @@ -5145,9 +5105,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -5156,9 +5116,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", "valuable", @@ -5187,12 +5147,12 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ - "ansi_term", "matchers", + "nu-ansi-term", "once_cell", "parking_lot 0.12.1", "regex", @@ -5554,7 +5514,6 @@ dependencies = [ "handlebars", "hashbrown", "http", - "influxdb", "ipnet", "metered", "migration", @@ -5575,7 +5534,7 @@ dependencies = [ "serde_json", "serde_prometheus", "siwe", - "time 0.3.14", + "time 0.3.15", "tokio", "tokio-stream", "toml", @@ -5749,7 +5708,7 @@ dependencies = [ "hmac", "pbkdf2 0.10.1", "sha1", - "time 0.3.14", + "time 0.3.15", "zstd", ] diff --git a/TODO.md b/TODO.md index 77e23f19..36958af6 100644 --- a/TODO.md +++ b/TODO.md @@ -174,10 +174,16 @@ These are roughly in order of completition - [x] get to /, when not serving a websocket, should have a simple welcome page. maybe with a button to update your wallet. - [x] instead of giving a rate limit error code, delay the connection's response at the start. reject if incoming requests is super high? - [x] did this by checking a key/ip-specific semaphore before checking rate limits -- [ ] collect requests per second per api key -- [ ] collect concurrent requests per api key -- [ ] collect distribution of methods per api key (eth_call, eth_getLogs, etc.) -- [ ] display key stats on an endpoint that requires authentication +- [x] emit stat on cache hit +- [x] emit stat on cache miss +- [ ] add grafana to dev docker-compose so we can browse stats +- [ ] emit stat on retry +- [ ] emit stat on no servers synced +- [ ] emit stat on error (maybe just use sentry, but graphs are handy) + - if we wait until the error handler to emit the stat, i don't think we have access to the authorized_request +- [ ] display requests per second per api key (only with authentication!) +- [ ] display concurrent requests per api key (only with authentication!) +- [ ] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!) - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in or spawned since it will slow things down and will make their calls less private - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it @@ -199,6 +205,8 @@ These are roughly in order of completition These are not yet ordered. +- [ ] EIP1271 for siwe +- [ ] Limited throughput during high traffic - [ ] implement filters and other unimplemented rpc methods - multiple teams need log filters and subscriptions. - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead diff --git a/config/example.toml b/config/example.toml index 4f3658dc..bb077923 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,17 +1,25 @@ [shared] chain_id = 1 + db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" # TODO: how do we find the optimal db_max_connections? too high actually ends up being slower db_max_connections = 99 -# TODO: influxdb_url = "http://influxdb:8086" + min_sum_soft_limit = 2000 min_synced_rpcs = 2 -redis_url = "redis://dev-redis:6379/" + # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower -redis_max_connections = 300 +persistent_redis_max_connections = 300 +persistent_redis_url = "redis://dev-predis:6379/" + +volatile_redis_max_connections = 300 +volatile_redis_url = "redis://dev-vredis:6379/" + redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" + public_rate_limit_per_minute = 0 + # 1GB of cache response_cache_max_bytes = 10000000000 diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index fb1f9c94..f8d50351 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -11,4 +11,4 @@ anyhow = "1.0.65" hashbrown = "0.12.3" moka = { version = "0.9.4", default-features = false, features = ["future"] } tokio = "1.21.2" -tracing = "0.1.36" +tracing = "0.1.37" diff --git a/docker-compose.common.yml b/docker-compose.common.yml index eca7487e..ebb07071 100644 --- a/docker-compose.common.yml +++ b/docker-compose.common.yml @@ -11,6 +11,11 @@ services: #RUST_LOG: "info,web3_proxy=debug" RUST_LOG: info - volatile_redis: - image: redis + persistent_redis: + image: redis:6.0-alpine command: [ "redis-server", "--save", "", "--appendonly", "no" ] + # be sure to mount /data! + + volatile_redis: + image: redis:6.0-alpine + command: [ "redis-server", "--save", "60", "1" ] diff --git a/docker-compose.yml b/docker-compose.yml index b1576eba..08e8910a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,13 +3,16 @@ version: "3.4" services: - dev-redis: - extends: - file: docker-compose.common.yml - service: volatile_redis + # manage the databases with a user friendly interface + # it is slightly dangerous with "drop all" as a single click + dev-adminer: + image: adminer ports: - - 127.0.0.1:16379:6379 + - 18306:8080 + environment: + ADMINER_DEFAULT_SERVER: dev-db + # in dev we use mysql, but production will use RDS or similar dev-db: image: mysql environment: @@ -20,35 +23,30 @@ services: volumes: - ./data/dev_mysql:/var/lib/mysql - dev-influxdb: - image: influxdb:latest - ports: - - '127.0.0.1:18086:8086' - volumes: - - ./data/dev_influxdb:/var/lib/influxdb - environment: - - INFLUXDB_DB=db0 - - INFLUXDB_ADMIN_USER=admin - - INFLUXDB_ADMIN_PASSWORD=dev_web3_proxy - - dev-otel-collector: - image: otel/opentelemetry-collector-dev:latest - expose: - - 4317 - - dev-adminer: - image: adminer - ports: - - 18306:8080 - environment: - ADMINER_DEFAULT_SERVER: dev-db - - dev-eth: + # persistent redis for storing user stats + # TODO: replace this with a real time series database + dev-predis: extends: file: docker-compose.common.yml - service: web3-proxy - volumes: - - ./config/example.toml:/config.toml + service: persistent_redis ports: - - 8544:8544 # proxy (should be behind something handling HTTPS) - - 8543:8543 # prometheus + - 16379:6379 + volumes: + - ./data/dev_predis:/data + + # volatile redis for storing rate limits + dev-vredis: + extends: + file: docker-compose.common.yml + service: volatile_redis + ports: + - 16380:6379 + # dev-eth: + # extends: + # file: docker-compose.common.yml + # service: web3-proxy + # volumes: + # - ./config/example.toml:/config.toml + # ports: + # - 8544:8544 # proxy (should be behind something handling HTTPS) + # - 8543:8543 # prometheus diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index 58e3da52..c62151b6 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] anyhow = "1.0.65" deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] } -tracing = "0.1.36" +tracing = "0.1.37" tokio = "1.21.2" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index c89f875e..e40ea133 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -36,7 +36,6 @@ flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" -influxdb = { version = "0.5.2", features = ["derive"] } ipnet = "2.5.0" metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.4", default-features = false, features = ["future"] } @@ -51,15 +50,15 @@ rand = "0.8.5" # TODO: regex has several "perf" features that we might want to use regex = "1.6.0" reqwest = { version = "0.11.12", default-features = false, features = ["json", "tokio-rustls"] } -handlebars = "4.3.4" +handlebars = "4.3.5" rustc-hash = "1.1.0" -siwe = "0.4.2" +siwe = "0.5.0" sea-orm = { version = "0.9.3", features = ["macros"] } serde = { version = "1.0.145", features = [] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude -time = "0.3.14" +time = "0.3.15" tokio = { version = "1.21.2", features = ["full", "tracing"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.10", features = ["sync"] } @@ -68,9 +67,9 @@ tower = "0.4.13" # TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid? tower-request-id = "0.2.0" tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers", "trace"] } -tracing = "0.1.36" +tracing = "0.1.37" # TODO: tracing-subscriber has serde and serde_json features that we might want to use -tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] } +tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] } ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" uuid = "1.1.2" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index ac79b2f8..37ee09c6 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -109,7 +109,7 @@ pub struct Web3ProxyApp { pub frontend_ip_rate_limiter: Option>, pub frontend_key_rate_limiter: Option>, pub login_rate_limiter: Option, - pub redis_pool: Option, + pub vredis_pool: Option, pub user_key_cache: Cache, pub user_key_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, @@ -229,15 +229,15 @@ impl Web3ProxyApp { // create a connection pool for redis // a failure to connect does NOT block the application from starting - let redis_pool = match top_config.app.redis_url.as_ref() { + let vredis_pool = match top_config.app.volatile_redis_url.as_ref() { Some(redis_url) => { // TODO: scrub credentials and then include the redis_url in logs - info!("Connecting to redis"); + info!("Connecting to vredis"); // TODO: what is a good default? let redis_max_connections = top_config .app - .redis_max_connections + .volatile_redis_max_connections .unwrap_or(num_workers * 2); // TODO: what are reasonable timeouts? @@ -251,37 +251,67 @@ impl Web3ProxyApp { if let Err(err) = redis_pool.get().await { error!( ?err, - "failed to connect to redis. some features will be disabled" + "failed to connect to vredis. some features will be disabled" ); }; Some(redis_pool) } None => { - warn!("no redis connection"); + warn!("no redis connection. some features will be disabled"); None } }; - // setup a channel here for receiving influxdb stats + // TODO: dry this with predis + let predis_pool = match top_config.app.persistent_redis_url.as_ref() { + Some(redis_url) => { + // TODO: scrub credentials and then include the redis_url in logs + info!("Connecting to predis"); + + // TODO: what is a good default? + let redis_max_connections = top_config + .app + .persistent_redis_max_connections + .unwrap_or(num_workers * 2); + + // TODO: what are reasonable timeouts? + let redis_pool = RedisConfig::from_url(redis_url) + .builder()? + .max_size(redis_max_connections) + .runtime(DeadpoolRuntime::Tokio1) + .build()?; + + // test the redis pool + if let Err(err) = redis_pool.get().await { + error!( + ?err, + "failed to connect to vredis. some features will be disabled" + ); + }; + + Some(redis_pool) + } + None => { + warn!("no predis connection. some features will be disabled"); + None + } + }; + + // setup a channel for receiving stats (generally with a high cardinality, such as per-user) // we do this in a channel so we don't slow down our response to the users - // TODO: make influxdb optional - let stat_sender = if let Some(influxdb_url) = top_config.app.influxdb_url.clone() { - let influxdb_name = top_config - .app - .influxdb_name - .clone() - .context("connecting to influxdb")?; + let stat_sender = if let Some(redis_pool) = predis_pool.clone() { + let redis_conn = redis_pool.get().await?; // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats let (stat_sender, stat_handle) = - StatEmitter::spawn(influxdb_url, influxdb_name, http_client.clone()); + StatEmitter::spawn(top_config.app.chain_id, redis_conn).await?; handles.push(stat_handle); Some(stat_sender) } else { - warn!("no influxdb connection"); + warn!("cannot store stats without a redis connection"); None }; @@ -317,7 +347,7 @@ impl Web3ProxyApp { top_config.app.chain_id, balanced_rpcs, http_client.clone(), - redis_pool.clone(), + vredis_pool.clone(), block_map.clone(), Some(head_block_sender), top_config.app.min_sum_soft_limit, @@ -343,7 +373,7 @@ impl Web3ProxyApp { top_config.app.chain_id, private_rpcs, http_client.clone(), - redis_pool.clone(), + vredis_pool.clone(), block_map, // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs None, @@ -370,7 +400,7 @@ impl Web3ProxyApp { let mut frontend_key_rate_limiter = None; let mut login_rate_limiter = None; - if let Some(redis_pool) = redis_pool.as_ref() { + if let Some(redis_pool) = vredis_pool.as_ref() { let rpc_rrl = RedisRateLimiter::new( "web3_proxy", "frontend", @@ -440,7 +470,7 @@ impl Web3ProxyApp { frontend_key_rate_limiter, login_rate_limiter, db_conn, - redis_pool, + vredis_pool, app_metrics, open_request_handle_metrics, user_key_cache, @@ -740,7 +770,7 @@ impl Web3ProxyApp { } pub async fn redis_conn(&self) -> anyhow::Result { - match self.redis_pool.as_ref() { + match self.vredis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), Some(redis_pool) => { let redis_conn = redis_pool.get().await?; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5f60e0a3..56b8d7b8 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -48,34 +48,49 @@ pub struct TopConfig { // TODO: no String, only &str #[derive(Debug, Default, Deserialize)] pub struct AppConfig { - // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 + /// EVM chain id. 1 for ETH + /// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 pub chain_id: u64, - pub cookie_domain: Option, - pub cookie_secure: Option, + /// Database is used for user data. + /// Currently supports mysql or compatible backend. pub db_url: Option, - /// minimum size of the connection pool for the database - /// If none, the number of workers are used + /// minimum size of the connection pool for the database. + /// If none, the number of workers are used. pub db_min_connections: Option, - /// minimum size of the connection pool for the database - /// If none, the minimum * 2 is used + /// maximum size of the connection pool for the database. + /// If none, the minimum * 2 is used. pub db_max_connections: Option, - pub influxdb_url: Option, - pub influxdb_name: Option, + /// Default request limit for registered users. + /// 0 = block all requests + /// None = allow all requests pub default_requests_per_minute: Option, + /// Restrict user registration. + /// None = no code needed pub invite_code: Option, + /// The soft limit prevents thundering herds as new blocks are seen. #[serde(default = "default_min_sum_soft_limit")] pub min_sum_soft_limit: u32, + /// Another knob for preventing thundering herds as new blocks are seen. #[serde(default = "default_min_synced_rpcs")] pub min_synced_rpcs: usize, - /// Set to 0 to block all anonymous requests + /// Request limit for anonymous users. + /// Set to 0 to block all anonymous requests. #[serde(default = "default_frontend_rate_limit_per_minute")] pub frontend_rate_limit_per_minute: u64, + /// Rate limit for the login entrypoint. + /// This is separate from the rpc limits. #[serde(default = "default_login_rate_limit_per_minute")] pub login_rate_limit_per_minute: u64, - pub redis_url: Option, + /// Persist user stats in a redis (or compatible backend) + /// TODO: research more time series databases + pub persistent_redis_url: Option, + pub persistent_redis_max_connections: Option, + /// Track rate limits in a redis (or compatible backend) + pub volatile_redis_url: Option, /// maximum size of the connection pool for the cache /// If none, the minimum * 2 is used - pub redis_max_connections: Option, + pub volatile_redis_max_connections: Option, + /// RPC responses are cached locally #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, /// the stats page url for an anonymous user. @@ -84,10 +99,12 @@ pub struct AppConfig { pub redirect_user_url: String, } +/// This might cause a thundering herd! fn default_min_sum_soft_limit() -> u32 { 1 } +/// Only require 1 server. This might cause a thundering herd! fn default_min_synced_rpcs() -> usize { 1 } @@ -110,10 +127,16 @@ fn default_response_cache_max_bytes() -> usize { #[derive(Debug, Deserialize, Constructor)] pub struct Web3ConnectionConfig { + /// websocket (or http if no websocket) url: String, + /// the requests per second at which the server starts slowing down soft_limit: u32, + /// the requests per second at which the server throws errors (rate limit or otherwise) hard_limit: Option, + /// All else equal, a server with a lower weight receives requests weight: u32, + /// Subscribe to the firehose of pending transactions + /// Don't do this with free rpcs subscribe_txs: Option, } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index a012b78c..36252dba 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -206,7 +206,7 @@ impl Display for &AuthorizedRequest { match self { AuthorizedRequest::Internal => f.write_str("internal"), AuthorizedRequest::Ip(x) => f.write_str(&format!("ip:{}", x)), - AuthorizedRequest::User(_, x) => f.write_str(&format!("user_key:{}", x.user_key_id)), + AuthorizedRequest::User(_, x) => f.write_str(&format!("uk:{}", x.user_key_id)), } } } diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 58d50c70..f58269e3 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -26,7 +26,7 @@ use http::StatusCode; use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; use serde::{Deserialize, Serialize}; -use siwe::Message; +use siwe::{Message, VerificationOpts}; use std::ops::Add; use std::sync::Arc; use time::{Duration, OffsetDateTime}; @@ -168,8 +168,14 @@ pub async fn post_login( let our_msg: siwe::Message = our_msg.parse().unwrap(); + let verify_config = VerificationOpts { + domain: Some(our_msg.domain), + nonce: Some(our_msg.nonce), + ..Default::default() + }; + // check the domain and a nonce. let timestamp be automatic - if let Err(e) = their_msg.verify(their_sig, Some(&our_msg.domain), Some(&our_msg.nonce), None) { + if let Err(e) = their_msg.verify(&their_sig, &verify_config).await { // message cannot be correctly authenticated todo!("proper error message: {}", e) } diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 7b32eb34..3153e51f 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,9 +1,9 @@ -use chrono::{DateTime, Utc}; +use anyhow::Context; use derive_more::From; -use influxdb::Client; -use influxdb::InfluxDbWriteable; +use redis_rate_limiter::{redis, RedisConnection}; +use std::fmt::Display; use tokio::task::JoinHandle; -use tracing::{error, info, trace}; +use tracing::{debug, error, info}; use crate::frontend::authorization::AuthorizedRequest; @@ -14,39 +14,29 @@ pub enum ProxyResponseType { Error, } -impl From for influxdb::Type { - fn from(x: ProxyResponseType) -> Self { - match x { - ProxyResponseType::CacheHit => "cache_hit".into(), - ProxyResponseType::CacheMiss => "cache_miss".into(), - ProxyResponseType::Error => "error".into(), +impl Display for ProxyResponseType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProxyResponseType::CacheHit => f.write_str("cache_hit"), + ProxyResponseType::CacheMiss => f.write_str("cache_miss"), + ProxyResponseType::Error => f.write_str("error"), } } } /// TODO: where should this be defined? -/// TODO: what should be fields and what should be tags. count is always 1 which feels wrong -#[derive(Debug, InfluxDbWriteable)] -pub struct ProxyResponseStat { - time: DateTime, - count: u32, - #[influxdb(tag)] - method: String, - #[influxdb(tag)] - response_type: ProxyResponseType, - #[influxdb(tag)] - who: String, -} +#[derive(Debug)] +pub struct ProxyResponseStat(String); +/// A very basic stat that we store in redis. +/// This probably belongs in a true time series database like influxdb, but client impl ProxyResponseStat { pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self { - Self { - time: Utc::now(), - count: 1, - method, - response_type, - who: who.to_string(), - } + // TODO: what order? + // TODO: app specific prefix. need at least the chain id + let redis_key = format!("proxy_response:{}:{}:{}", method, response_type, who); + + Self(redis_key) } } @@ -56,9 +46,9 @@ pub enum Web3ProxyStat { } impl Web3ProxyStat { - fn into_query(self) -> influxdb::WriteQuery { + fn into_redis_key(self, chain_id: u64) -> String { match self { - Self::ProxyResponse(x) => x.into_query("proxy_response"), + Self::ProxyResponse(x) => format!("{}:{}", x.0, chain_id), } } } @@ -66,31 +56,30 @@ impl Web3ProxyStat { pub struct StatEmitter; impl StatEmitter { - pub fn spawn( - influxdb_url: String, - influxdb_name: String, - http_client: Option, - ) -> (flume::Sender, JoinHandle>) { + pub async fn spawn( + chain_id: u64, + mut redis_conn: RedisConnection, + ) -> anyhow::Result<(flume::Sender, JoinHandle>)> { let (tx, rx) = flume::unbounded::(); - let client = Client::new(influxdb_url, influxdb_name); - - // use an existing http client - let client = if let Some(http_client) = http_client { - client.with_http_client(http_client) - } else { - client - }; - + // simple future that reads the channel and emits stats let f = async move { while let Ok(x) = rx.recv_async().await { - let x = x.into_query(); + // TODO: batch stats? spawn this? - trace!(?x, "emitting stat"); + let x = x.into_redis_key(chain_id); - if let Err(err) = client.query(x).await { - error!(?err, "failed writing stat"); - // TODO: now what? + // TODO: this is too loud. just doing it for dev + debug!(?x, "emitting stat"); + + // TODO: do this without the pipe? + if let Err(err) = redis::pipe() + .incr(&x, 1) + .query_async::<_, ()>(&mut redis_conn) + .await + .context("incrementing stat") + { + error!(?err, "emitting stat") } } @@ -101,6 +90,6 @@ impl StatEmitter { let handle = tokio::spawn(f); - (tx, handle) + Ok((tx, handle)) } }