From decdac4b6df87f725e8918ff9352f2b80697dfe7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 10:42:57 -0700 Subject: [PATCH 1/5] set protocol --- web3_proxy/src/app/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6651bffa..243d1313 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -468,9 +468,18 @@ impl Web3ProxyApp { let mut kafka_producer: Option = None; if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { + let security_protocol = if kafka_brokers.starts_with("SSL:") { + "ssl" + } else if kafka_brokers.starts_with("TCP:") { + "plaintext" + } else { + return Err(anyhow::anyhow!("unexpected kafka protocol")); + }; + match rdkafka::ClientConfig::new() .set("bootstrap.servers", kafka_brokers) .set("message.timeout.ms", "5000") + .set("security.protocol", security_protocol) .create() { Ok(k) => kafka_producer = Some(k), From f5775e557706c2b66442d4850d8970a731c5e053 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 14:28:28 -0700 Subject: [PATCH 2/5] add security.protocol to search_kafka script --- web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index 35d11e9d..517e2f88 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -52,16 +52,25 @@ impl SearchKafkaSubCommand { let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]); - let brokers = top_config + let kafka_brokers = top_config .app .kafka_urls .context("top_config.app.kafka_urls is required")?; let mut consumer = ClientConfig::new(); + let security_protocol = if kafka_brokers.starts_with("SSL:") { + "ssl" + } else if kafka_brokers.starts_with("TCP:") { + "plaintext" + } else { + return Err(anyhow::anyhow!("unexpected kafka protocol")); + }; + consumer - .set("bootstrap.servers", &brokers) + .set("bootstrap.servers", &kafka_brokers) .set("enable.partition.eof", "false") + .set("security.protocol", security_protocol) .set("session.timeout.ms", "6000") .set("enable.auto.commit", "false"); From cc546d4d2e445389584a990cc0e7f15999ad2c67 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 14:42:40 -0700 Subject: [PATCH 3/5] actually apply the migrations --- migration/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/migration/src/lib.rs b/migration/src/lib.rs index fe7e3ec6..cc031348 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -14,9 +14,11 @@ mod m20221211_124002_request_method_privacy; mod m20221213_134158_move_login_into_database; mod m20230117_191358_admin_table; mod m20230119_204135_better_free_tier; +mod m20230125_204810_stats_v2; mod m20230130_124740_read_only_login_logic; mod m20230130_165144_prepare_admin_imitation_pre_login; mod m20230215_152254_admin_trail; +mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; pub struct Migrator; @@ -38,9 +40,11 @@ impl MigratorTrait for Migrator { Box::new(m20221213_134158_move_login_into_database::Migration), Box::new(m20230117_191358_admin_table::Migration), Box::new(m20230119_204135_better_free_tier::Migration), + Box::new(m20230125_204810_stats_v2::Migration), Box::new(m20230130_124740_read_only_login_logic::Migration), Box::new(m20230130_165144_prepare_admin_imitation_pre_login::Migration), Box::new(m20230215_152254_admin_trail::Migration), + Box::new(m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2::Migration), ] } } From 10d2d46512fe8ddbd453f6acf9cf3665205e18ba Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 15:22:27 -0700 Subject: [PATCH 4/5] more packages for kafka+ssl --- Cargo.lock | 24 +++++++++++++++++++ Dockerfile | 10 +++++++- config/example.toml | 1 + web3_proxy/Cargo.toml | 1 + web3_proxy/src/app/mod.rs | 8 +------ .../src/bin/web3_proxy_cli/search_kafka.rs | 8 +------ web3_proxy/src/config.rs | 7 ++++++ 7 files changed, 44 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5969f100..9690f7f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,15 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "codespan-reporting" version = "0.11.1" @@ -3266,6 +3275,18 @@ dependencies = [ "syn", ] +[[package]] +name = "openssl-sys" +version = "0.9.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3d193fb1488ad46ffe3aaabc912cc931d02ee8518fe2959aea8ef52718b0c0" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "3.4.0" @@ -3956,10 +3977,13 @@ version = "4.3.0+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" dependencies = [ + "cmake", "libc", "libz-sys", "num_enum", + "openssl-sys", "pkg-config", + "zstd-sys", ] [[package]] diff --git a/Dockerfile b/Dockerfile index 8d99140a..95ea69b8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,14 @@ RUN curl -L https://foundry.paradigm.xyz | bash && foundryup # install web3-proxy system dependencies. most things are rust-only, but not everything RUN apt-get update && \ - apt-get install --yes librdkafka-dev && \ + apt-get install --yes \ + cmake \ + liblz4-dev \ + libpthread-stubs0-dev \ + libssl-dev \ + libzstd-dev \ + make \ + && \ rm -rf /var/lib/apt/lists/* # copy the application @@ -35,6 +42,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=/app/target \ cargo install \ --locked \ + --features rdkafka-src \ --no-default-features \ --profile faster_release \ --root /opt/bin \ diff --git a/config/example.toml b/config/example.toml index d72147ba..48c9084b 100644 --- a/config/example.toml +++ b/config/example.toml @@ -12,6 +12,7 @@ db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" kafka_urls = "127.0.0.1:19092" +kafka_protocol = "plaintext" # thundering herd protection # only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 55631364..bfd7abb2 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -10,6 +10,7 @@ default-run = "web3_proxy_cli" default = ["deadlock_detection", "tokio-console"] deadlock_detection = ["parking_lot/deadlock_detection"] tokio-console = ["dep:tokio-console", "dep:console-subscriber"] +rdkafka-src = ["rdkafka/cmake-build", "rdkafka/libz", "rdkafka/ssl", "rdkafka/zstd-pkg-config"] # TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 243d1313..2c095fdc 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -468,13 +468,7 @@ impl Web3ProxyApp { let mut kafka_producer: Option = None; if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { - let security_protocol = if kafka_brokers.starts_with("SSL:") { - "ssl" - } else if kafka_brokers.starts_with("TCP:") { - "plaintext" - } else { - return Err(anyhow::anyhow!("unexpected kafka protocol")); - }; + let security_protocol = &top_config.app.kafka_protocol; match rdkafka::ClientConfig::new() .set("bootstrap.servers", kafka_brokers) diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index 517e2f88..89553f71 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -59,13 +59,7 @@ impl SearchKafkaSubCommand { let mut consumer = ClientConfig::new(); - let security_protocol = if kafka_brokers.starts_with("SSL:") { - "ssl" - } else if kafka_brokers.starts_with("TCP:") { - "plaintext" - } else { - return Err(anyhow::anyhow!("unexpected kafka protocol")); - }; + let security_protocol = &top_config.app.kafka_protocol; consumer .set("bootstrap.servers", &kafka_brokers) diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 40ec8828..a4fa2c88 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -108,6 +108,9 @@ pub struct AppConfig { /// Used by /debug/:rpc_key urls for logging requests and responses. No other endpoints log request/response data. pub kafka_urls: Option, + #[serde(default = "default_kafka_protocol")] + pub kafka_protocol: String, + /// domain in sign-in-with-ethereum messages pub login_domain: Option, @@ -202,6 +205,10 @@ fn default_login_rate_limit_per_period() -> u64 { 10 } +fn default_kafka_protocol() -> String { + "ssl".to_string() +} + fn default_response_cache_max_bytes() -> u64 { // TODO: default to some percentage of the system? // 100 megabytes From b0653c0e9f3ef3c816e36323a9a7d1c2d632d3dd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Apr 2023 15:30:01 -0700 Subject: [PATCH 5/5] add log for connecting to kafka --- web3_proxy/src/app/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2c095fdc..5041cd1d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -468,6 +468,8 @@ impl Web3ProxyApp { let mut kafka_producer: Option = None; if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { + info!("Connecting to kafka"); + let security_protocol = &top_config.app.kafka_protocol; match rdkafka::ClientConfig::new()