no need for try_join_all here
This commit is contained in:
parent
2a421778ef
commit
d27b91cad9
@ -190,6 +190,10 @@ impl Web3Rpcs {
|
|||||||
&self,
|
&self,
|
||||||
mut block_and_rpc_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
|
mut block_and_rpc_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
|
||||||
) -> Web3ProxyResult<()> {
|
) -> Web3ProxyResult<()> {
|
||||||
|
if self.watch_head_block.is_none() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let mut consensus_finder =
|
let mut consensus_finder =
|
||||||
ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag));
|
ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag));
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
|
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
|
||||||
use super::consensus::{RankedRpcs, RpcsForRequest};
|
use super::consensus::{RankedRpcs, RpcsForRequest};
|
||||||
use super::one::Web3Rpc;
|
use super::one::Web3Rpc;
|
||||||
use crate::app::{flatten_handle, App, Web3ProxyJoinHandle};
|
use crate::app::{App, Web3ProxyJoinHandle};
|
||||||
use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig};
|
use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig};
|
||||||
use crate::errors::{Web3ProxyError, Web3ProxyResult};
|
use crate::errors::{Web3ProxyError, Web3ProxyResult};
|
||||||
use crate::frontend::rpc_proxy_ws::ProxyMode;
|
use crate::frontend::rpc_proxy_ws::ProxyMode;
|
||||||
@ -12,7 +12,6 @@ use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
|
|||||||
use deduped_broadcast::DedupedBroadcaster;
|
use deduped_broadcast::DedupedBroadcaster;
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::prelude::{TxHash, U64};
|
use ethers::prelude::{TxHash, U64};
|
||||||
use futures::future::try_join_all;
|
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use futures_util::future::join_all;
|
use futures_util::future::join_all;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
@ -155,7 +154,11 @@ impl Web3Rpcs {
|
|||||||
let handle = {
|
let handle = {
|
||||||
let connections = connections.clone();
|
let connections = connections.clone();
|
||||||
|
|
||||||
tokio::spawn(connections.subscribe(block_and_rpc_receiver))
|
tokio::spawn(async move {
|
||||||
|
connections
|
||||||
|
.process_incoming_blocks(block_and_rpc_receiver)
|
||||||
|
.await
|
||||||
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((connections, handle, consensus_connections_watcher))
|
Ok((connections, handle, consensus_connections_watcher))
|
||||||
@ -346,49 +349,6 @@ impl Web3Rpcs {
|
|||||||
self.min_synced_rpcs
|
self.min_synced_rpcs
|
||||||
}
|
}
|
||||||
|
|
||||||
/// subscribe to blocks and transactions from all the backend rpcs.
|
|
||||||
/// blocks are processed by all the `Web3Rpc`s and then sent to the `block_receiver`
|
|
||||||
/// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender`
|
|
||||||
/// returns True if the subscription is not needed. False if it exited after starting
|
|
||||||
async fn subscribe(
|
|
||||||
self: Arc<Self>,
|
|
||||||
block_and_rpc_receiver: mpsc::UnboundedReceiver<BlockAndRpc>,
|
|
||||||
) -> Web3ProxyResult<()> {
|
|
||||||
let mut futures = vec![];
|
|
||||||
|
|
||||||
// TODO: do we need anything here to set up the transaction funnel
|
|
||||||
|
|
||||||
// setup the block funnel
|
|
||||||
if self.watch_head_block.is_some() {
|
|
||||||
let connections = Arc::clone(&self);
|
|
||||||
|
|
||||||
let handle = tokio::task::Builder::default()
|
|
||||||
.name("process_incoming_blocks")
|
|
||||||
.spawn(async move {
|
|
||||||
connections
|
|
||||||
.process_incoming_blocks(block_and_rpc_receiver)
|
|
||||||
.await
|
|
||||||
})?;
|
|
||||||
|
|
||||||
futures.push(flatten_handle(handle));
|
|
||||||
}
|
|
||||||
|
|
||||||
if futures.is_empty() {
|
|
||||||
// no transaction or block subscriptions.
|
|
||||||
info!(?self, "no subscriptions needed");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = try_join_all(futures).await {
|
|
||||||
error!(?self, "subscriptions over");
|
|
||||||
// TODO: i think this should maybe be a panic
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(?self, "subscriptions over");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TODO: i think this RpcsForRequest should be stored on the ValidatedRequest when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app
|
/// TODO: i think this RpcsForRequest should be stored on the ValidatedRequest when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app
|
||||||
pub async fn wait_for_rpcs_for_request(
|
pub async fn wait_for_rpcs_for_request(
|
||||||
&self,
|
&self,
|
||||||
|
Loading…
Reference in New Issue
Block a user