oops. didn't mean to commit this script yet

This commit is contained in:
Bryan Stitt 2023-03-02 21:12:35 -08:00
parent 586b772cdf
commit cbdd6d7c1e
3 changed files with 10 additions and 19 deletions

@ -56,7 +56,7 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
rdkafka = { version = "0.29.0", optional = true }
rdkafka = { version = "0.29.0" }
regex = "1.7.1"
reqwest = { version = "0.11.14", default-features = false, features = ["json", "tokio-rustls"] }
rmp-serde = "1.1.1"

@ -37,9 +37,7 @@ use migration::sea_orm::{
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
#[cfg(rdkafka)]
use rdkafka::message::{Header, OwnedHeaders};
#[cfg(rdkafka)]
use rdkafka::producer::FutureRecord;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
@ -230,11 +228,7 @@ pub struct Web3ProxyApp {
pub bearer_token_semaphores:
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
#[cfg(rdkafka)]
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
#[cfg(not(rdkafka))]
pub kafka_producer: Option<()>,
}
/// flatten a JoinError into an anyhow error
@ -469,12 +463,7 @@ impl Web3ProxyApp {
// connect to kafka for logging requests from the /debug/ urls
#[cfg(rdkafka)]
let mut kafka_producer: Option<rdkafka::producer::FutureProducer> = None;
#[cfg(not(rdkafka))]
let kafka_producer: Option<()> = None;
#[cfg(rdkafka)]
if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() {
match rdkafka::ClientConfig::new()
.set("bootstrap.servers", kafka_brokers)
@ -1095,10 +1084,7 @@ impl Web3ProxyApp {
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
#[cfg(rdkafka)]
let mut kafka_stuff = None;
#[cfg(rdkafka)]
if let Some(kafka_producer) = self.kafka_producer.clone() {
let request_bytes = rmp_serde::to_vec(&request)?;
@ -1114,8 +1100,10 @@ impl Web3ProxyApp {
let request_hash = Some(keccak256(&request_bytes));
// the third item is added with the response
let kafka_headers = OwnedHeaders::new_with_capacity(3)
let chain_id = self.config.chain_id;
// another item is added with the response
let kafka_headers = OwnedHeaders::new_with_capacity(4)
.insert(Header {
key: "request_hash",
value: request_hash.as_ref(),
@ -1123,6 +1111,10 @@ impl Web3ProxyApp {
.insert(Header {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
})
.insert(Header {
key: "chain_id",
value: Some(&chain_id.to_le_bytes()),
});
// save the key and headers for when we log the response
@ -1752,7 +1744,6 @@ impl Web3ProxyApp {
.context("stat_sender sending response stat")?;
}
#[cfg(rdkafka)]
if let Some((kafka_key, kafka_headers)) = kafka_stuff {
let kafka_producer = self
.kafka_producer

@ -376,7 +376,7 @@ fn main() -> anyhow::Result<()> {
x.main(pagerduty_async, top_config).await
}
SubCommand::PopularityContest(x) => x.main().await,
SubCommand::SearchKafka(x) => x.main().await,
SubCommand::SearchKafka(x) => x.main(top_config.unwrap()).await,
SubCommand::Sentryd(x) => {
if cli_config.sentry_url.is_none() {
warn!("sentry_url is not set! Logs will only show in this console");