use kanal instead of flume or tokio channels (#68)

This commit is contained in:
Bryan Stitt 2023-05-12 15:47:01 -07:00 committed by GitHub
parent 8a097dabbe
commit 510612d343
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 97 additions and 174 deletions

129
Cargo.lock generated

@ -792,16 +792,6 @@ dependencies = [
"cc",
]
[[package]]
name = "codespan-reporting"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e"
dependencies = [
"termcolor",
"unicode-width",
]
[[package]]
name = "coins-bip32"
version = "0.8.3"
@ -1204,50 +1194,6 @@ dependencies = [
"cipher 0.4.4",
]
[[package]]
name = "cxx"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f61f1b6389c3fe1c316bf8a4dccc90a38208354b330925bce1f74a6c4756eb93"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cee708e8962df2aeb38f594aae5d827c022b6460ac71a7a3e2c3c2aae5a07b"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn 2.0.15",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7944172ae7e4068c533afbb984114a56c46e9ccddda550499caa222902c7f7bb"
[[package]]
name = "cxxbridge-macro"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.15",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -2052,19 +1998,6 @@ dependencies = [
"miniz_oxide 0.7.1",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -2376,9 +2309,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.18"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21"
checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782"
dependencies = [
"bytes",
"fnv",
@ -2712,12 +2645,11 @@ dependencies = [
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cxx",
"cxx-build",
"cc",
]
[[package]]
@ -2976,6 +2908,16 @@ dependencies = [
"signature 2.1.0",
]
[[package]]
name = "kanal"
version = "0.1.0-pre8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7"
dependencies = [
"futures-core",
"lock_api",
]
[[package]]
name = "keccak"
version = "0.1.4"
@ -3021,6 +2963,7 @@ name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"kanal",
"log",
"serde",
"tokio",
@ -3069,15 +3012,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.7"
@ -3274,15 +3208,6 @@ dependencies = [
"uuid 1.3.2",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@ -4277,7 +4202,7 @@ dependencies = [
name = "rate-counter"
version = "0.1.0"
dependencies = [
"flume",
"kanal",
"tokio",
]
@ -4323,9 +4248,9 @@ dependencies = [
[[package]]
name = "rdkafka-sys"
version = "4.3.0+1.9.2"
version = "4.4.0+1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4"
checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249"
dependencies = [
"cmake",
"libc",
@ -4777,12 +4702,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "scrypt"
version = "0.10.0"
@ -5004,9 +4923,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.8.2"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254"
checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1"
dependencies = [
"bitflags",
"core-foundation",
@ -5017,9 +4936,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.8.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4"
checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7"
dependencies = [
"core-foundation-sys",
"libc",
@ -6658,7 +6577,6 @@ dependencies = [
"ethers",
"ewma",
"fdlimit",
"flume",
"fstrings",
"futures",
"gethostname",
@ -6673,6 +6591,7 @@ dependencies = [
"influxdb2-structmap",
"ipnet",
"itertools",
"kanal",
"latency",
"listenfd",
"log",

@ -189,7 +189,7 @@ These are roughly in order of completition
- [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop
- need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- need an kanal::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- [x] don't use unix timestamps for response_millis since leap seconds will confuse it
- [x] config to allow origins even on the anonymous endpoints
- [x] send logs to sentry

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
ewma = "0.1.1"
kanal = "0.1.0-pre8"
log = "0.4.17"
serde = { version = "1.0.163", features = [] }
tokio = { version = "1.28.1", features = ["full"] }

@ -2,9 +2,8 @@ mod rtt_estimate;
use std::sync::Arc;
use kanal::SendError;
use log::error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
@ -20,7 +19,7 @@ pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: mpsc::Sender<Duration>,
request_tx: kanal::AsyncSender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
@ -34,7 +33,7 @@ impl PeakEwmaLatency {
/// average latency.
pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = mpsc::channel(buf_size);
let (request_tx, request_rx) = kanal::bounded_async(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
@ -71,16 +70,19 @@ impl PeakEwmaLatency {
/// Should only be called from the Web3Rpc that owns it.
pub fn report(&self, duration: Duration) {
match self.request_tx.try_send(duration) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
Ok(true) => {}
Ok(false) => {
// We don't want to block if the channel is full, just
// report the error
error!("Latency report channel full");
// TODO: could we spawn a new tokio task to report tthis later?
}
Err(TrySendError::Closed(_)) => {
Err(SendError::Closed) => {
unreachable!("Owner should keep channel open");
}
Err(SendError::ReceiveClosed) => {
unreachable!("Receiver should keep channel open");
}
};
//.expect("Owner should keep channel open");
}
@ -90,7 +92,7 @@ impl PeakEwmaLatency {
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: mpsc::Receiver<Duration>,
request_rx: kanal::AsyncReceiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
@ -102,7 +104,7 @@ struct PeakEwmaLatencyTask {
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(mut self) {
while let Some(rtt) = self.request_rx.recv().await {
while let Ok(rtt) = self.request_rx.recv().await {
self.update(rtt);
}
}

@ -5,5 +5,5 @@ authors = ["Bryan Stitt <bryan@llamanodes.com>"]
edition = "2021"
[dependencies]
flume = "0.10.14"
kanal = "0.1.0-pre8"
tokio = { version = "1.28.1", features = ["time"] }

@ -43,7 +43,6 @@ env_logger = "0.10.0"
ethers = { version = "2.0.4", default-features = false, features = ["rustls", "ws"] }
ewma = "0.1.1"
fdlimit = "0.2.1"
flume = "0.10.14"
fstrings = "0.2"
futures = { version = "0.3.28", features = ["thread-pool"] }
gethostname = "0.4.2"
@ -58,6 +57,7 @@ influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rust
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"}
ipnet = "2.7.2"
itertools = "0.10.5"
kanal = "0.1.0-pre8"
listenfd = "1.0.1"
log = "0.4.17"
mimalloc = { version = "0.1.37", optional = true}

@ -265,7 +265,7 @@ pub struct Web3ProxyApp {
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
/// channel for sending stats in a background task
pub stat_sender: Option<flume::Sender<AppStat>>,
pub stat_sender: Option<kanal::AsyncSender<AppStat>>,
}
/// flatten a JoinError into an anyhow error

@ -24,7 +24,7 @@ impl Web3ProxyApp {
jsonrpc_request: JsonRpcRequest,
subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
response_sender: kanal::AsyncSender<Message>,
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
let request_metadata = RequestMetadata::new(
self,
@ -94,7 +94,7 @@ impl Web3ProxyApp {
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
@ -158,7 +158,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -221,7 +221,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -285,7 +285,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};

@ -11,7 +11,6 @@ use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use serde_json::json;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert};
@ -116,7 +115,7 @@ impl SentrydSubCommand {
let mut handles = FuturesUnordered::new();
// channels and a task for sending errors to logs/pagerduty
let (error_sender, mut error_receiver) = mpsc::channel::<SentrydError>(10);
let (error_sender, error_receiver) = kanal::bounded_async::<SentrydError>(10);
{
let error_handler_f = async move {
@ -124,7 +123,7 @@ impl SentrydSubCommand {
info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors");
}
while let Some(err) = error_receiver.recv().await {
while let Ok(err) = error_receiver.recv().await {
log::log!(err.level, "check failed: {:#?}", err);
if matches!(err.level, log::Level::Error) {
@ -258,7 +257,7 @@ async fn a_loop<T>(
class: &str,
seconds: u64,
error_level: log::Level,
error_sender: mpsc::Sender<SentrydError>,
error_sender: kanal::AsyncSender<SentrydError>,
f: impl Fn(SentrydErrorBuilder) -> T,
) -> anyhow::Result<()>
where

@ -285,8 +285,8 @@ impl Web3RpcConfig {
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
tx_id_sender: Option<kanal::AsyncSender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {

@ -267,7 +267,7 @@ pub struct RequestMetadata {
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Channel to send stats to
pub stat_sender: Option<flume::Sender<AppStat>>,
pub stat_sender: Option<kanal::AsyncSender<AppStat>>,
}
impl Default for RequestMetadata {
@ -457,8 +457,11 @@ impl RequestMetadata {
let stat: AppStat = stat.into();
// can't use async because a Drop can call this
let stat_sender = stat_sender.to_sync();
if let Err(err) = stat_sender.send(stat) {
error!("failed sending stats for {:?}: {:?}", err.0, err);
error!("failed sending stat: {:?}", err);
// TODO: return it? that seems like it might cause an infinite loop
};

@ -51,7 +51,6 @@ pub enum Web3ProxyError {
EthersHttpClientError(ethers::prelude::HttpClientError),
EthersProviderError(ethers::prelude::ProviderError),
EthersWsClientError(ethers::prelude::WsClientError),
FlumeRecvError(flume::RecvError),
GasEstimateNotU256,
Headers(headers::Error),
HeaderToString(ToStrError),
@ -78,6 +77,8 @@ pub enum Web3ProxyError {
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
JsonRpcForwardedError(JsonRpcForwardedResponse),
KanalReceiveError(kanal::ReceiveError),
KanalSendError(kanal::SendError),
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
MsgPackEncode(rmp_serde::encode::Error),
@ -112,7 +113,6 @@ pub enum Web3ProxyError {
#[from(ignore)]
RefererNotAllowed(headers::Referer),
SemaphoreAcquireError(AcquireError),
SendAppStatError(flume::SendError<crate::stats::AppStat>),
SerdeJson(serde_json::Error),
/// simple way to return an error message to the user and an anyhow to our logs
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
@ -261,8 +261,8 @@ impl Web3ProxyError {
),
)
}
Self::FlumeRecvError(err) => {
warn!("FlumeRecvError err={:#?}", err);
Self::KanalReceiveError(err) => {
warn!("KanalRecvError err={:#?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
@ -701,7 +701,7 @@ impl Web3ProxyError {
),
)
}
Self::SendAppStatError(err) => {
Self::KanalSendError(err) => {
error!("SendAppStatError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,

@ -311,7 +311,7 @@ async fn proxy_web3_socket(
let (ws_tx, ws_rx) = socket.split();
// create a channel for our reader and writer can communicate. todo: benchmark different channels
let (response_sender, response_receiver) = flume::unbounded::<Message>();
let (response_sender, response_receiver) = kanal::unbounded_async::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
@ -323,7 +323,7 @@ async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &flume::Sender<Message>,
response_sender: &kanal::AsyncSender<Message>,
subscription_count: &AtomicUsize,
subscriptions: Arc<RwLock<HashMap<Bytes, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
@ -452,7 +452,7 @@ async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
response_sender: kanal::AsyncSender<Message>,
) {
// RwLock should be fine here. a user isn't going to be opening tons of subscriptions
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
@ -528,7 +528,7 @@ async fn read_web3_socket(
}
};
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).await.is_err() {
let _ = close_sender.send(true);
return;
};
@ -549,13 +549,13 @@ async fn read_web3_socket(
}
async fn write_web3_socket(
response_rx: flume::Receiver<Message>,
response_rx: kanal::AsyncReceiver<Message>,
mut ws_tx: SplitSink<WebSocket, Message>,
) {
// TODO: increment counter for open websockets
// TODO: is there any way to make this stream receive.
while let Ok(msg) = response_rx.recv_async().await {
while let Ok(msg) = response_rx.recv().await {
// a response is ready
// TODO: poke rate limits for this user?

@ -365,7 +365,7 @@ impl Web3Rpcs {
pub(super) async fn process_incoming_blocks(
&self,
authorization: &Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
block_receiver: kanal::AsyncReceiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
@ -373,7 +373,7 @@ impl Web3Rpcs {
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {
match block_receiver.recv_async().await {
match block_receiver.recv().await {
Ok((new_block, rpc)) => {
let rpc_name = rpc.name.clone();

@ -43,7 +43,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
#[derive(From)]
pub struct Web3Rpcs {
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
pub(crate) block_sender: kanal::AsyncSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
pub(crate) by_name: ArcSwap<HashMap<String, Arc<Web3Rpc>>>,
/// notify all http providers to check their blocks at the same time
@ -57,8 +57,8 @@ pub struct Web3Rpcs {
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) pending_transaction_cache:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
pub(super) pending_tx_id_receiver: kanal::AsyncReceiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: kanal::AsyncSender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) blocks_by_hash: BlocksByHashCache,
@ -94,8 +94,8 @@ impl Web3Rpcs {
watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
// watch::Receiver<Arc<ConsensusWeb3Rpcs>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (block_sender, block_receiver) = kanal::unbounded_async::<BlockAndRpc>();
// TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check?
let expected_block_time_ms = match chain_id {
@ -347,7 +347,7 @@ impl Web3Rpcs {
async fn subscribe(
self: Arc<Self>,
authorization: Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
block_receiver: kanal::AsyncReceiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -362,7 +362,7 @@ impl Web3Rpcs {
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = tokio::task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv().await {
let f = clone.clone().process_incoming_tx_id(
authorization.clone(),
rpc,
@ -1391,8 +1391,8 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _block_receiver) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1643,8 +1643,8 @@ mod tests {
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1807,8 +1807,8 @@ mod tests {
),
]);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _) = kanal::unbounded_async();
let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);

@ -98,8 +98,8 @@ impl Web3Rpc {
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
redis_pool: Option<RedisPool>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
tx_id_sender: Option<kanal::AsyncSender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
let created_at = Instant::now();
@ -388,7 +388,7 @@ impl Web3Rpc {
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_connect(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
block_sender: Option<&kanal::AsyncSender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
delay_start: bool,
@ -451,7 +451,7 @@ impl Web3Rpc {
/// connect to the web3 provider
async fn connect(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
block_sender: Option<&kanal::AsyncSender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
@ -473,7 +473,7 @@ impl Web3Rpc {
// tell the block subscriber that this rpc doesn't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((None, self.clone()))
.send((None, self.clone()))
.await
.context("block_sender during connect")?;
}
@ -588,7 +588,7 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
block_sender: &kanal::AsyncSender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
let new_head_block = match new_head_block {
@ -651,7 +651,7 @@ impl Web3Rpc {
// send an empty block to take this server out of rotation
block_sender
.send_async((new_head_block, self.clone()))
.send((new_head_block, self.clone()))
.await
.context("block_sender")?;
@ -670,11 +670,11 @@ impl Web3Rpc {
self: Arc<Self>,
authorization: &Arc<Authorization>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
block_sender: Option<kanal::AsyncSender<BlockAndRpc>>,
chain_id: u64,
disconnect_receiver: watch::Receiver<bool>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
tx_id_sender: Option<kanal::AsyncSender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> {
let error_handler = if self.backup {
RequestErrorHandler::DebugLevel
@ -895,7 +895,7 @@ impl Web3Rpc {
self: Arc<Self>,
authorization: Arc<Authorization>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>,
block_sender: kanal::AsyncSender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
@ -1090,7 +1090,7 @@ impl Web3Rpc {
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
tx_id_sender: kanal::AsyncSender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
@ -1115,7 +1115,7 @@ impl Web3Rpc {
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.send((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;

@ -672,7 +672,6 @@ impl RpcQueryStats {
method: Option<&str>,
) -> Decimal {
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
if ["eth_chainId"].contains(method) {
return 0.into();

@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats {
#[derive(From)]
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
pub stat_sender: kanal::AsyncSender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>,
}
@ -65,7 +65,7 @@ impl StatBuffer {
return Ok(None);
}
let (stat_sender, stat_receiver) = flume::unbounded();
let (stat_sender, stat_receiver) = kanal::unbounded_async();
let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self {
@ -94,7 +94,7 @@ impl StatBuffer {
async fn aggregate_and_save_loop(
&mut self,
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
stat_receiver: kanal::AsyncReceiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> {
let mut tsdb_save_interval =
@ -107,7 +107,7 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
stat = stat_receiver.recv() => {
// info!("Received stat");
// save the stat to a buffer
match stat {