From 2eb8df22ec572a5fec2ec8828dce17d5a72ffbbe Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 3 Mar 2023 02:54:52 -0800 Subject: [PATCH] kafka helper script --- Dockerfile | 1 + web3_proxy/src/app/mod.rs | 23 ++++--- web3_proxy/src/bin/web3_proxy_cli/main.rs | 4 +- .../src/bin/web3_proxy_cli/search_kafka.rs | 67 +++++++++++++++---- 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/Dockerfile b/Dockerfile index 20286b13..9e1a5400 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ cargo install cargo-nextest # foundry is needed to run tests +# TODO: do this in a seperate FROM and COPY it in ENV PATH /root/.foundry/bin:$PATH RUN curl -L https://foundry.paradigm.xyz | bash && foundryup diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 9e88eff2..a9cbaacd 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1086,7 +1086,7 @@ impl Web3ProxyApp { let mut kafka_stuff = None; if let Some(kafka_producer) = self.kafka_producer.clone() { - let request_bytes = rmp_serde::to_vec(&request)?; + let kafka_topic = "proxy_cached_request".to_string(); let rpc_secret_key_id = authorization .checks @@ -1094,15 +1094,15 @@ impl Web3ProxyApp { .map(|x| x.get()) .unwrap_or_default(); - let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id) - .context("failed serializing kafka key") - .unwrap(); + let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?; + + let request_bytes = rmp_serde::to_vec(&request)?; let request_hash = Some(keccak256(&request_bytes)); let chain_id = self.config.chain_id; - // another item is added with the response + // another item is added with the response, so initial_capacity is +1 what is needed here let kafka_headers = OwnedHeaders::new_with_capacity(4) .insert(Header { key: "request_hash", @@ -1118,11 +1118,15 @@ impl Web3ProxyApp { }); // save the key and headers for when we log the response - kafka_stuff = Some((kafka_key.clone(), kafka_headers.clone())); + kafka_stuff = Some(( + kafka_topic.clone(), + kafka_key.clone(), + kafka_headers.clone(), + )); let f = async move { let produce_future = kafka_producer.send( - FutureRecord::to("proxy_rpc_request") + FutureRecord::to(&kafka_topic) .key(&kafka_key) .payload(&request_bytes) .headers(kafka_headers), @@ -1131,6 +1135,7 @@ impl Web3ProxyApp { if let Err((err, msg)) = produce_future.await { error!("produce kafka request log: {}. {:#?}", err, msg); + // TODO: re-queue the msg? } }; @@ -1744,7 +1749,7 @@ impl Web3ProxyApp { .context("stat_sender sending response stat")?; } - if let Some((kafka_key, kafka_headers)) = kafka_stuff { + if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff { let kafka_producer = self .kafka_producer .clone() @@ -1755,7 +1760,7 @@ impl Web3ProxyApp { let f = async move { let produce_future = kafka_producer.send( - FutureRecord::to("proxy_rpc_response") + FutureRecord::to(&kafka_topic) .key(&kafka_key) .payload(&response_bytes) .headers(kafka_headers), diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 9a40e763..99b8c042 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -376,7 +376,9 @@ fn main() -> anyhow::Result<()> { x.main(pagerduty_async, top_config).await } SubCommand::PopularityContest(x) => x.main().await, - SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await, + SubCommand::SearchKafka(x) => { + x.main(top_config.unwrap()).await + }, SubCommand::Sentryd(x) => { if cli_config.sentry_url.is_none() { warn!("sentry_url is not set! Logs will only show in this console"); diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index c4bc3bca..35d11e9d 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -1,22 +1,27 @@ use anyhow::Context; use argh::FromArgs; +use entities::rpc_key; use futures::TryStreamExt; use log::info; +use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use rdkafka::{ consumer::{Consumer, StreamConsumer}, ClientConfig, Message, }; use std::num::NonZeroU64; -use web3_proxy::{config::TopConfig, frontend::authorization::RpcSecretKey}; +use uuid::Uuid; +use web3_proxy::{app::get_db, config::TopConfig, frontend::authorization::RpcSecretKey}; /// Second subcommand. #[derive(FromArgs, PartialEq, Debug, Eq)] #[argh(subcommand, name = "search_kafka")] pub struct SearchKafkaSubCommand { #[argh(positional)] - group_id: String, - #[argh(positional)] - input_topic: String, + /// topics to read + topics: Vec, + #[argh(option)] + /// optional group id + group_id: Option, #[argh(option)] /// rpc_key to search. Be careful when handling keys! rpc_key: Option, @@ -27,28 +32,64 @@ pub struct SearchKafkaSubCommand { impl SearchKafkaSubCommand { pub async fn main(self, top_config: TopConfig) -> anyhow::Result<()> { + let mut rpc_key_id = self.rpc_key_id.map(|x| x.get()); + + if let Some(rpc_key) = self.rpc_key { + let db_conn = get_db(top_config.app.db_url.unwrap(), 1, 1).await?; + + let rpc_key: Uuid = rpc_key.into(); + + let x = rpc_key::Entity::find() + .filter(rpc_key::Column::SecretKey.eq(rpc_key)) + .one(&db_conn) + .await? + .context("key not found")?; + + rpc_key_id = Some(x.id); + } + + let wanted_kafka_key = rpc_key_id.map(|x| rmp_serde::to_vec(&x).unwrap()); + + let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]); + let brokers = top_config .app .kafka_urls .context("top_config.app.kafka_urls is required")?; - // TODO: headers - // TODO: headers - let consumer: StreamConsumer = ClientConfig::new() - .set("group.id", &self.group_id) + let mut consumer = ClientConfig::new(); + + consumer .set("bootstrap.servers", &brokers) .set("enable.partition.eof", "false") .set("session.timeout.ms", "6000") - .set("enable.auto.commit", "false") - .create() - .context("Consumer creation failed")?; + .set("enable.auto.commit", "false"); + if let Some(group_id) = self.group_id { + consumer.set("group.id", &group_id); + } + + let consumer: StreamConsumer = consumer + .create() + .context("kafka consumer creation failed")?; + + let topics: Vec<&str> = self.topics.iter().map(String::as_ref).collect(); + + // TODO: how should we set start/end timestamp for the consumer? i think we need to look at metadata consumer - .subscribe(&[&self.input_topic]) + .subscribe(&topics) .expect("Can't subscribe to specified topic"); let stream_processor = consumer.stream().try_for_each(|msg| async move { - info!("Message received: {}", msg.offset()); + if msg.key() != wanted_kafka_key { + return Ok(()); + } + + // TODO: filter by headers? + + info!("msg: {}", msg.offset()); + + // TODO: now what? Ok(()) });