kafka helper script

This commit is contained in:
Bryan Stitt 2023-03-03 02:54:52 -08:00
parent 04c49b925f
commit 2eb8df22ec
4 changed files with 72 additions and 23 deletions

@ -14,6 +14,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
cargo install cargo-nextest
# foundry is needed to run tests
# TODO: do this in a seperate FROM and COPY it in
ENV PATH /root/.foundry/bin:$PATH
RUN curl -L https://foundry.paradigm.xyz | bash && foundryup

@ -1086,7 +1086,7 @@ impl Web3ProxyApp {
let mut kafka_stuff = None;
if let Some(kafka_producer) = self.kafka_producer.clone() {
let request_bytes = rmp_serde::to_vec(&request)?;
let kafka_topic = "proxy_cached_request".to_string();
let rpc_secret_key_id = authorization
.checks
@ -1094,15 +1094,15 @@ impl Web3ProxyApp {
.map(|x| x.get())
.unwrap_or_default();
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)
.context("failed serializing kafka key")
.unwrap();
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?;
let request_bytes = rmp_serde::to_vec(&request)?;
let request_hash = Some(keccak256(&request_bytes));
let chain_id = self.config.chain_id;
// another item is added with the response
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = OwnedHeaders::new_with_capacity(4)
.insert(Header {
key: "request_hash",
@ -1118,11 +1118,15 @@ impl Web3ProxyApp {
});
// save the key and headers for when we log the response
kafka_stuff = Some((kafka_key.clone(), kafka_headers.clone()));
kafka_stuff = Some((
kafka_topic.clone(),
kafka_key.clone(),
kafka_headers.clone(),
));
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to("proxy_rpc_request")
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&request_bytes)
.headers(kafka_headers),
@ -1131,6 +1135,7 @@ impl Web3ProxyApp {
if let Err((err, msg)) = produce_future.await {
error!("produce kafka request log: {}. {:#?}", err, msg);
// TODO: re-queue the msg?
}
};
@ -1744,7 +1749,7 @@ impl Web3ProxyApp {
.context("stat_sender sending response stat")?;
}
if let Some((kafka_key, kafka_headers)) = kafka_stuff {
if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff {
let kafka_producer = self
.kafka_producer
.clone()
@ -1755,7 +1760,7 @@ impl Web3ProxyApp {
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to("proxy_rpc_response")
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&response_bytes)
.headers(kafka_headers),

@ -376,7 +376,9 @@ fn main() -> anyhow::Result<()> {
x.main(pagerduty_async, top_config).await
}
SubCommand::PopularityContest(x) => x.main().await,
SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await,
SubCommand::SearchKafka(x) => {
x.main(top_config.unwrap()).await
},
SubCommand::Sentryd(x) => {
if cli_config.sentry_url.is_none() {
warn!("sentry_url is not set! Logs will only show in this console");

@ -1,22 +1,27 @@
use anyhow::Context;
use argh::FromArgs;
use entities::rpc_key;
use futures::TryStreamExt;
use log::info;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Message,
};
use std::num::NonZeroU64;
use web3_proxy::{config::TopConfig, frontend::authorization::RpcSecretKey};
use uuid::Uuid;
use web3_proxy::{app::get_db, config::TopConfig, frontend::authorization::RpcSecretKey};
/// Second subcommand.
#[derive(FromArgs, PartialEq, Debug, Eq)]
#[argh(subcommand, name = "search_kafka")]
pub struct SearchKafkaSubCommand {
#[argh(positional)]
group_id: String,
#[argh(positional)]
input_topic: String,
/// topics to read
topics: Vec<String>,
#[argh(option)]
/// optional group id
group_id: Option<String>,
#[argh(option)]
/// rpc_key to search. Be careful when handling keys!
rpc_key: Option<RpcSecretKey>,
@ -27,28 +32,64 @@ pub struct SearchKafkaSubCommand {
impl SearchKafkaSubCommand {
pub async fn main(self, top_config: TopConfig) -> anyhow::Result<()> {
let mut rpc_key_id = self.rpc_key_id.map(|x| x.get());
if let Some(rpc_key) = self.rpc_key {
let db_conn = get_db(top_config.app.db_url.unwrap(), 1, 1).await?;
let rpc_key: Uuid = rpc_key.into();
let x = rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(rpc_key))
.one(&db_conn)
.await?
.context("key not found")?;
rpc_key_id = Some(x.id);
}
let wanted_kafka_key = rpc_key_id.map(|x| rmp_serde::to_vec(&x).unwrap());
let wanted_kafka_key = wanted_kafka_key.as_ref().map(|x| &x[..]);
let brokers = top_config
.app
.kafka_urls
.context("top_config.app.kafka_urls is required")?;
// TODO: headers
// TODO: headers
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &self.group_id)
let mut consumer = ClientConfig::new();
consumer
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.context("Consumer creation failed")?;
.set("enable.auto.commit", "false");
if let Some(group_id) = self.group_id {
consumer.set("group.id", &group_id);
}
let consumer: StreamConsumer = consumer
.create()
.context("kafka consumer creation failed")?;
let topics: Vec<&str> = self.topics.iter().map(String::as_ref).collect();
// TODO: how should we set start/end timestamp for the consumer? i think we need to look at metadata
consumer
.subscribe(&[&self.input_topic])
.subscribe(&topics)
.expect("Can't subscribe to specified topic");
let stream_processor = consumer.stream().try_for_each(|msg| async move {
info!("Message received: {}", msg.offset());
if msg.key() != wanted_kafka_key {
return Ok(());
}
// TODO: filter by headers?
info!("msg: {}", msg.offset());
// TODO: now what?
Ok(())
});