diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs index 6f4f0628..caf5fe97 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs @@ -57,6 +57,7 @@ impl SearchKafkaSubCommand { .kafka_urls .context("top_config.app.kafka_urls is required")?; + // TODO: sea-streamer instead of rdkafka? let mut consumer = ClientConfig::new(); let security_protocol = &top_config.app.kafka_protocol; diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 6e4c8c1e..4936a44d 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -10,7 +10,7 @@ use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; use tokio::sync::broadcast; -use tokio::time::interval; +use tokio::time::{interval, sleep}; use tracing::{error, info, trace}; #[derive(Debug, Default)] @@ -162,6 +162,9 @@ impl StatBuffer { } } + // TODO: don't just sleep. wait for things to actually finish + sleep(Duration::from_secs(10)).await; + let saved_relational = self.save_relational_stats().await; info!("saved {} pending relational stat(s)", saved_relational);