drop flume. just use tokio

i dont know if this is the problem, but this will also get compile times down
This commit is contained in:
Bryan Stitt 2023-07-10 23:08:06 -07:00
parent 3059d08674
commit 0f7e370c92
22 changed files with 126 additions and 162 deletions

18
Cargo.lock generated

@ -2163,19 +2163,6 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -3168,7 +3155,6 @@ checksum = "d3c48237b9604c5a4702de6b824e02006c3214327564636aef27c1028a8fa0ed"
name = "latency"
version = "0.1.0"
dependencies = [
"flume",
"log",
"portable-atomic",
"serde",
@ -3421,9 +3407,6 @@ name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.10",
]
[[package]]
name = "native-tls"
@ -7072,7 +7055,6 @@ dependencies = [
"ethbloom",
"ethers",
"fdlimit",
"flume",
"fstrings",
"futures",
"glob",

@ -189,7 +189,7 @@ These are roughly in order of completition
- [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop
- need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- need a tokio::sync::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- [x] don't use unix timestamps for response_millis since leap seconds will confuse it
- [x] config to allow origins even on the anonymous endpoints
- [x] send logs to sentry

@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flume = "0.10.14"
log = "0.4.19"
portable-atomic = { version = "1.3.3", features = ["float"] }
serde = { version = "1.0.171", features = [] }

@ -1,13 +1,12 @@
mod rtt_estimate;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use tracing::{enabled, error, trace, Level};
use self::rtt_estimate::AtomicRttEstimate;
use crate::util::nanos::nanos;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use tracing::{enabled, error, trace, Level};
/// Latency calculation using Peak EWMA algorithm
///
@ -18,7 +17,7 @@ pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: flume::Sender<Duration>,
request_tx: mpsc::Sender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
@ -35,7 +34,7 @@ impl PeakEwmaLatency {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = flume::bounded(buf_size);
let (request_tx, request_rx) = mpsc::channel(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
@ -92,7 +91,7 @@ impl PeakEwmaLatency {
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: flume::Receiver<Duration>,
request_rx: mpsc::Receiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
@ -103,8 +102,8 @@ struct PeakEwmaLatencyTask {
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(self) {
while let Ok(rtt) = self.request_rx.recv_async().await {
async fn run(mut self) {
while let Some(rtt) = self.request_rx.recv().await {
self.update(rtt);
}
trace!("latency loop exited");

@ -3,6 +3,7 @@ use std::sync::Arc;
use portable_atomic::{AtomicF32, Ordering};
use serde::ser::Serializer;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use watermill::quantile::RollingQuantile;
@ -12,7 +13,7 @@ pub struct RollingQuantileLatency {
/// Join handle for the latency calculation task.
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration.
latency_tx: flume::Sender<f32>,
latency_tx: mpsc::Sender<f32>,
/// rolling quantile latency in seconds. Updated async.
seconds: Arc<AtomicF32>,
}
@ -20,7 +21,7 @@ pub struct RollingQuantileLatency {
/// Task to be spawned per-RollingMedianLatency for calculating the value
struct RollingQuantileLatencyTask {
/// Receive to update each request duration
latency_rx: flume::Receiver<f32>,
latency_rx: mpsc::Receiver<f32>,
/// Current estimate and update time
seconds: Arc<AtomicF32>,
/// quantile value.
@ -29,7 +30,7 @@ struct RollingQuantileLatencyTask {
impl RollingQuantileLatencyTask {
fn new(
latency_rx: flume::Receiver<f32>,
latency_rx: mpsc::Receiver<f32>,
seconds: Arc<AtomicF32>,
q: f32,
window_size: usize,
@ -45,7 +46,7 @@ impl RollingQuantileLatencyTask {
/// Run the loop for updating latency.
async fn run(mut self) {
while let Ok(rtt) = self.latency_rx.recv_async().await {
while let Some(rtt) = self.latency_rx.recv().await {
self.update(rtt);
}
}
@ -88,7 +89,7 @@ impl RollingQuantileLatency {
impl RollingQuantileLatency {
pub async fn spawn(quantile_value: f32, window_size: usize) -> Self {
// TODO: how should queue size and window size be related?
let (latency_tx, latency_rx) = flume::bounded(window_size);
let (latency_tx, latency_rx) = mpsc::channel(window_size);
let seconds = Arc::new(AtomicF32::new(0.0));

@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flume = "0.10.14"
log = "0.4.18"
quick_cache = "0.3.0"
serde = "1"

@ -18,7 +18,7 @@ pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
max_item_weight: NonZeroU32,
name: &'static str,
ttl: Duration,
tx: flume::Sender<(Instant, Key, Qey)>,
tx: mpsc::Sender<(Instant, Key, Qey)>,
weighter: We,
pub task_handle: JoinHandle<()>,
@ -27,7 +27,7 @@ pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
name: &'static str,
rx: flume::Receiver<(Instant, Key, Qey)>,
rx: mpsc::Receiver<(Instant, Key, Qey)>,
}
pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
@ -54,7 +54,7 @@ impl<
hash_builder: B,
ttl: Duration,
) -> Self {
let (tx, rx) = flume::unbounded();
let (tx, rx) = mpsc::unbounded();
let cache = KQCache::with(
estimated_items_capacity,

@ -52,7 +52,6 @@ derive_more = { version = "0.99.17", features = ["nightly"] }
ethbloom = { version = "0.13.0" }
ethers = { version = "2.0.7", default-features = false, features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.14"
fstrings = "0.2"
futures = { version = "0.3.28" }
glob = "0.3.1"

@ -50,7 +50,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore, oneshot};
use tokio::sync::{broadcast, mpsc, watch, Semaphore, oneshot};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::{error, info, trace, warn, Level};
@ -124,7 +124,7 @@ pub struct Web3ProxyApp {
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
pub vredis_pool: Option<RedisPool>,
/// channel for sending stats in a background task
pub stat_sender: Option<flume::Sender<AppStat>>,
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
/// Optional time series database for making pretty graphs that load quickly
influxdb_client: Option<influxdb2::Client>,
@ -179,8 +179,8 @@ impl Web3ProxyApp {
top_config: TopConfig,
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();

@ -17,6 +17,7 @@ use http::StatusCode;
use serde_json::json;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{error, trace};
@ -28,7 +29,7 @@ impl Web3ProxyApp {
jsonrpc_request: JsonRpcRequest,
subscription_count: &'a AtomicU64,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: flume::Sender<Message>,
response_sender: mpsc::UnboundedSender<Message>,
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
let request_metadata = RequestMetadata::new(
self,
@ -87,7 +88,7 @@ impl Web3ProxyApp {
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send_async(close_message).await;
let _ = response_sender.send(close_message);
break;
}
@ -112,7 +113,7 @@ impl Web3ProxyApp {
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
@ -151,7 +152,7 @@ impl Web3ProxyApp {
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send_async(close_message).await;
let _ = response_sender.send(close_message);
break;
}
@ -182,7 +183,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages? reply with binary if thats what we were sent
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -222,7 +223,7 @@ impl Web3ProxyApp {
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send_async(close_message).await;
let _ = response_sender.send(close_message);
break;
}
@ -251,7 +252,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
@ -291,7 +292,7 @@ impl Web3ProxyApp {
.rate_limit_close_websocket(&subscription_request_metadata)
.await
{
let _ = response_sender.send_async(close_message).await;
let _ = response_sender.send(close_message);
break;
}
@ -321,7 +322,7 @@ impl Web3ProxyApp {
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};

@ -72,11 +72,11 @@ pub async fn clean_block_number(
block_param_id: usize,
latest_block: &Web3ProxyBlock,
rpcs: &Web3Rpcs,
) -> anyhow::Result<BlockNumAndHash> {
) -> Web3ProxyResult<BlockNumAndHash> {
match params.as_array_mut() {
None => {
// TODO: this needs the correct error code in the response
Err(anyhow::anyhow!("params not an array"))
Err(anyhow::anyhow!("params not an array").into())
}
Some(params) => match params.get_mut(block_param_id) {
None => {
@ -107,7 +107,7 @@ pub async fn clean_block_number(
(BlockNumAndHash::from(&block), false)
} else {
return Err(anyhow::anyhow!("blockHash missing"));
return Err(anyhow::anyhow!("blockHash missing").into());
}
} else {
// it might be a string like "latest" or a block number or a block hash
@ -157,7 +157,8 @@ pub async fn clean_block_number(
} else {
return Err(anyhow::anyhow!(
"param not a block identifier, block number, or block hash"
));
)
.into());
}
};
@ -370,6 +371,13 @@ impl CacheMode {
block,
cache_errors: true,
}),
Err(Web3ProxyError::NoBlocksKnown) => {
warn!(%method, ?params, "no servers available to get block from params");
Ok(CacheMode::Cache {
block: head_block.into(),
cache_errors: true,
})
}
Err(err) => {
error!(%method, ?params, ?err, "could not get block from params");
Ok(CacheMode::Cache {

@ -11,6 +11,7 @@ use serde::Deserialize;
use serde_inline_default::serde_inline_default;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::warn;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
@ -278,9 +279,9 @@ impl Web3RpcConfig {
block_interval: Duration,
http_client: Option<reqwest::Client>,
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
block_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
max_head_block_age: Duration,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
tx_id_sender: Option<mpsc::UnboundedSender<TxHashAndRpc>>,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!(extra=?self.extra.keys(), "unknown Web3RpcConfig fields!");

@ -63,7 +63,6 @@ pub enum Web3ProxyError {
EthersHttpClient(ethers::prelude::HttpClientError),
EthersProvider(ethers::prelude::ProviderError),
EthersWsClient(ethers::prelude::WsClientError),
FlumeRecv(flume::RecvError),
GasEstimateNotU256,
HdrRecord(hdrhistogram::errors::RecordError),
Headers(headers::Error),
@ -140,7 +139,6 @@ pub enum Web3ProxyError {
#[from(ignore)]
RefererNotAllowed(headers::Referer),
SemaphoreAcquireError(AcquireError),
SendAppStatError(flume::SendError<crate::stats::AppStat>),
SerdeJson(serde_json::Error),
SiweVerification(VerificationError),
/// simple way to return an error message to the user and an anyhow to our logs
@ -332,17 +330,6 @@ impl Web3ProxyError {
)
}
}
Self::FlumeRecv(err) => {
warn!(?err, "FlumeRecvError");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "flume recv error!".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
// Self::JsonRpcForwardedError(x) => (StatusCode::OK, x),
Self::GasEstimateNotU256 => {
trace!("GasEstimateNotU256");
@ -886,17 +873,6 @@ impl Web3ProxyError {
},
)
}
Self::SendAppStatError(err) => {
error!(?err, "SendAppStatError");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "error stat_sender sending response_stat".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::SerdeJson(err) => {
trace!(?err, "serde json");
(

@ -42,7 +42,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{error, trace, warn};
@ -346,7 +346,7 @@ pub struct RequestMetadata {
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<flume::Sender<AppStat>>,
pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
}
impl Default for Authorization {
@ -517,7 +517,7 @@ impl RequestMetadata {
let stat: AppStat = stat.into();
if let Err(err) = stat_sender.try_send(stat) {
if let Err(err) = stat_sender.send(stat) {
error!(?err, "failed sending stat");
// TODO: return it? that seems like it might cause an infinite loop
// TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge

@ -34,7 +34,7 @@ use std::net::IpAddr;
use std::str::from_utf8_mut;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock as AsyncRwLock};
use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, RwLock as AsyncRwLock};
use tracing::{info, trace};
/// How to select backend servers for a request
@ -306,7 +306,8 @@ async fn proxy_web3_socket(
let (ws_tx, ws_rx) = socket.split();
// create a channel for our reader and writer can communicate. todo: benchmark different channels
let (response_sender, response_receiver) = flume::unbounded::<Message>();
// TODO: this should be bounded. async blocking on too many messages would be fine
let (response_sender, response_receiver) = mpsc::unbounded_channel::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
@ -317,7 +318,7 @@ async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
payload: &str,
response_sender: &flume::Sender<Message>,
response_sender: &mpsc::UnboundedSender<Message>,
subscription_count: &AtomicU64,
subscriptions: Arc<AsyncRwLock<HashMap<U64, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
@ -434,7 +435,7 @@ async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
response_sender: mpsc::UnboundedSender<Message>,
) {
let subscriptions = Arc::new(AsyncRwLock::new(HashMap::new()));
let subscription_count = Arc::new(AtomicU64::new(1));
@ -519,7 +520,7 @@ async fn read_web3_socket(
}
};
if response_sender.send_async(response_msg).await.is_err() {
if response_sender.send(response_msg).is_err() {
let _ = close_sender.send(true);
};
};
@ -537,12 +538,12 @@ async fn read_web3_socket(
}
async fn write_web3_socket(
response_rx: flume::Receiver<Message>,
mut response_rx: mpsc::UnboundedReceiver<Message>,
mut ws_tx: SplitSink<WebSocket, Message>,
) {
// TODO: increment counter for open websockets
while let Ok(msg) = response_rx.recv_async().await {
while let Some(msg) = response_rx.recv().await {
// a response is ready
// we do not check rate limits here. they are checked before putting things into response_sender;

@ -15,7 +15,7 @@ use serde_json::json;
use std::hash::Hash;
use std::time::Duration;
use std::{fmt::Display, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tokio::time::timeout;
use tracing::{debug, error, warn};
@ -427,7 +427,7 @@ impl Web3Rpcs {
pub(super) async fn process_incoming_blocks(
&self,
authorization: &Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
mut block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
@ -441,8 +441,8 @@ impl Web3Rpcs {
let mut had_first_success = false;
loop {
match timeout(double_block_time, block_receiver.recv_async()).await {
Ok(Ok((new_block, rpc))) => {
match timeout(double_block_time, block_receiver.recv()).await {
Ok(Some((new_block, rpc))) => {
let rpc_name = rpc.name.clone();
let rpc_is_backup = rpc.backup;
@ -485,10 +485,9 @@ impl Web3Rpcs {
}
}
}
Ok(Err(err)) => {
Ok(None) => {
// TODO: panic is probably too much, but getting here is definitely not good
error!("block_receiver on {} exited! {:#?}", self, err);
return Err(err.into());
return Err(anyhow::anyhow!("block_receiver on {} exited", self).into());
}
Err(_) => {
// TODO: what timeout on this?

@ -31,7 +31,7 @@ use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::select;
use tokio::sync::{broadcast, watch};
use tokio::sync::{broadcast, mpsc, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
@ -42,7 +42,7 @@ pub struct Web3Rpcs {
pub(crate) name: String,
pub(crate) chain_id: u64,
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
@ -55,8 +55,8 @@ pub struct Web3Rpcs {
pub(super) watch_head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache: Cache<TxHash, TxStatus>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
pub(super) pending_tx_id_receiver: AsyncRwLock<mpsc::UnboundedReceiver<TxHashAndRpc>>,
pub(super) pending_tx_id_sender: mpsc::UnboundedSender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) blocks_by_hash: BlocksByHashCache,
@ -91,8 +91,8 @@ impl Web3Rpcs {
Web3ProxyJoinHandle<()>,
watch::Receiver<Option<Arc<RankedRpcs>>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (block_sender, block_receiver) = mpsc::unbounded_channel::<BlockAndRpc>();
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: actual weighter on this
@ -133,7 +133,7 @@ impl Web3Rpcs {
min_sum_soft_limit,
name,
pending_transaction_cache,
pending_tx_id_receiver,
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
watch_head_block: watch_consensus_head_sender,
watch_ranked_rpcs: watch_consensus_rpcs_sender,
@ -329,7 +329,7 @@ impl Web3Rpcs {
async fn subscribe(
self: Arc<Self>,
authorization: Arc<Authorization>,
block_receiver: flume::Receiver<BlockAndRpc>,
block_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> {
let mut futures = vec![];
@ -341,10 +341,11 @@ impl Web3Rpcs {
if let Some(pending_tx_sender) = pending_tx_sender.clone() {
let clone = self.clone();
let authorization = authorization.clone();
let pending_tx_id_receiver = self.pending_tx_id_receiver.clone();
let handle = tokio::task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
while let Some((pending_tx_id, rpc)) =
clone.pending_tx_id_receiver.write().await.recv().await
{
let f = clone.clone().process_incoming_tx_id(
authorization.clone(),
rpc,
@ -1323,7 +1324,7 @@ impl Serialize for Web3Rpcs {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
let mut state = serializer.serialize_struct("Web3Rpcs", 5)?;
{
let by_name = self.by_name.read();
@ -1352,8 +1353,6 @@ impl Serialize for Web3Rpcs {
),
)?;
state.serialize_field("block_sender_len", &self.block_sender.len())?;
state.serialize_field(
"watch_consensus_rpcs_receivers",
&self.watch_ranked_rpcs.receiver_count(),
@ -1535,8 +1534,8 @@ mod tests {
let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc);
let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _block_receiver) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1557,7 +1556,7 @@ mod tests {
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
pending_tx_id_receiver,
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
@ -1805,8 +1804,8 @@ mod tests {
let pruned_rpc = Arc::new(pruned_rpc);
let archive_rpc = Arc::new(archive_rpc);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -1826,7 +1825,7 @@ mod tests {
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
pending_tx_id_receiver,
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
@ -1989,8 +1988,8 @@ mod tests {
let mock_geth = Arc::new(mock_geth);
let mock_erigon_archive = Arc::new(mock_erigon_archive);
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, _) = mpsc::unbounded_channel();
let (pending_tx_id_sender, pending_tx_id_receiver) = mpsc::unbounded_channel();
let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
@ -2012,7 +2011,7 @@ mod tests {
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
pending_transaction_cache: Cache::new(10_000),
pending_tx_id_receiver,
pending_tx_id_receiver: AsyncRwLock::new(pending_tx_id_receiver),
pending_tx_id_sender,
blocks_by_hash: Cache::new(10_000),
blocks_by_number: Cache::new(10_000),

@ -26,7 +26,7 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::{watch, RwLock as AsyncRwLock};
use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, trace, warn, Level};
use url::Url;
@ -98,9 +98,9 @@ impl Web3Rpc {
redis_pool: Option<RedisPool>,
block_interval: Duration,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
max_head_block_age: Duration,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now();
@ -465,7 +465,7 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>,
block_and_rpc_sender: &flume::Sender<BlockAndRpc>,
block_and_rpc_sender: &mpsc::UnboundedSender<BlockAndRpc>,
block_map: &BlocksByHashCache,
) -> Web3ProxyResult<()> {
let head_block_sender = self.head_block.as_ref().unwrap();
@ -530,8 +530,7 @@ impl Web3Rpc {
// tell web3rpcs about this rpc having this block
block_and_rpc_sender
.send_async((new_head_block, self.clone()))
.await
.send((new_head_block, self.clone()))
.context("block_and_rpc_sender failed sending")?;
Ok(())
@ -601,9 +600,9 @@ impl Web3Rpc {
async fn subscribe_with_reconnect(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
loop {
if let Err(err) = self
@ -644,9 +643,9 @@ impl Web3Rpc {
async fn subscribe(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
tx_id_sender: Option<mpsc::UnboundedSender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
Some(RequestErrorHandler::DebugLevel)
@ -803,7 +802,7 @@ impl Web3Rpc {
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: &Arc<Self>,
block_sender: flume::Sender<BlockAndRpc>,
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
block_map: BlocksByHashCache,
subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
@ -898,7 +897,7 @@ impl Web3Rpc {
/// Turn on the firehose of pending transactions
async fn subscribe_pending_transactions(
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
tx_id_sender: mpsc::UnboundedSender<(TxHash, Arc<Self>)>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
// TODO: check that it actually changed to true

@ -10,7 +10,7 @@ use influxdb2::api::write::TimestampPrecision;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{interval, sleep};
use tracing::{error, info, trace};
@ -33,7 +33,7 @@ pub struct BufferedRpcQueryStats {
#[derive(From)]
pub struct SpawnedStatBuffer {
pub stat_sender: flume::Sender<AppStat>,
pub stat_sender: mpsc::UnboundedSender<AppStat>,
/// these handles are important and must be allowed to finish
pub background_handle: Web3ProxyJoinHandle<()>,
}
@ -53,7 +53,7 @@ pub struct StatBuffer {
tsdb_save_interval_seconds: u32,
user_balance_cache: UserBalanceCache,
_flush_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
_flush_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
}
impl StatBuffer {
@ -69,8 +69,8 @@ impl StatBuffer {
user_balance_cache: Option<UserBalanceCache>,
shutdown_receiver: broadcast::Receiver<()>,
tsdb_save_interval_seconds: u32,
flush_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
flush_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
flush_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() {
influxdb_client = None;
@ -80,7 +80,7 @@ impl StatBuffer {
return Ok(None);
}
let (stat_sender, stat_receiver) = flume::unbounded();
let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
let timestamp_precision = TimestampPrecision::Seconds;
@ -113,9 +113,9 @@ impl StatBuffer {
async fn aggregate_and_save_loop(
&mut self,
stat_receiver: flume::Receiver<AppStat>,
mut stat_receiver: mpsc::UnboundedReceiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
flush_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
mut flush_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
@ -124,11 +124,11 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {
stat = stat_receiver.recv() => {
// trace!("Received stat");
// save the stat to a buffer
match stat {
Ok(AppStat::RpcQuery(stat)) => {
Some(AppStat::RpcQuery(stat)) => {
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
@ -145,8 +145,8 @@ impl StatBuffer {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await;
}
}
Err(err) => {
info!("error receiving stat: {}", err);
None => {
info!("error receiving stat");
break;
}
}
@ -165,9 +165,9 @@ impl StatBuffer {
trace!("Saved {} stats to the tsdb", count);
}
}
x = flush_receiver.recv_async() => {
x = flush_receiver.recv() => {
match x {
Ok(x) => {
Some(x) => {
trace!("flush");
let tsdb_count = self.save_tsdb_stats().await;
@ -184,8 +184,9 @@ impl StatBuffer {
error!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats");
}
}
Err(err) => {
error!(?err, "unable to flush stat buffer!");
None => {
error!("unable to flush stat buffer!");
break;
}
}
}

@ -16,7 +16,7 @@ use migration::{Expr, Value};
use parking_lot::Mutex;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tokio::time::Instant;
use tracing::{error, info};
use ulid::Ulid;
@ -72,7 +72,7 @@ impl MigrateStatsToV2SubCommand {
None => None,
};
let (flush_sender, flush_receiver) = flume::bounded(1);
let (flush_sender, flush_receiver) = mpsc::channel(1);
// Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn(

@ -12,9 +12,9 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tokio::select;
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{sleep_until, Instant};
use tracing::{debug, error, info, trace, warn};
use tracing::{error, info, trace, warn};
/// start the main proxy daemon
#[derive(FromArgs, PartialEq, Debug, Eq)]
@ -42,7 +42,7 @@ impl ProxydSubCommand {
let frontend_port = Arc::new(self.port.into());
let prometheus_port = Arc::new(self.prometheus_port.into());
let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(8);
let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = mpsc::channel(8);
Self::_main(
top_config,
@ -66,8 +66,8 @@ impl ProxydSubCommand {
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<()> {
// tokio has code for catching ctrl+c so we use that to shut down in most cases
// frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something

@ -23,7 +23,7 @@ use tokio::{
process::Command as AsyncCommand,
sync::{
broadcast::{self, error::SendError},
oneshot,
mpsc, oneshot,
},
task::JoinHandle,
time::{sleep, Instant},
@ -59,7 +59,7 @@ pub struct TestApp {
pub proxy_provider: Provider<Http>,
/// tell the app to flush stats to the database
flush_stat_buffer_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
@ -278,7 +278,7 @@ impl TestApp {
let frontend_port_arc = Arc::new(AtomicU16::new(0));
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1);
let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = mpsc::channel(1);
// spawn the app
// TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop
@ -331,7 +331,7 @@ impl TestApp {
pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> {
let (tx, rx) = oneshot::channel();
self.flush_stat_buffer_sender.send(tx)?;
self.flush_stat_buffer_sender.send(tx).await?;
let x = rx.await?;