one less clone

This commit is contained in:
Bryan Stitt 2022-05-12 19:44:31 +00:00
parent 6069c4ae4a
commit f82f5ea544
4 changed files with 12 additions and 33 deletions

View File

@ -9,9 +9,9 @@ chain_id = 1
url = "ws://127.0.0.1:8546" url = "ws://127.0.0.1:8546"
soft_limit = 200_000 soft_limit = 200_000
[balanced_rpc_tiers.0.ankr] #[balanced_rpc_tiers.0.ankr]
url = "https://rpc.ankr.com/eth" #url = "https://rpc.ankr.com/eth"
soft_limit = 3_000 #soft_limit = 3_000
[private_rpcs] [private_rpcs]

View File

@ -278,12 +278,8 @@ impl Web3ProxyApp {
// TODO: what allowed lag? // TODO: what allowed lag?
match balanced_rpcs.next_upstream_server().await { match balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
let response = balanced_rpcs let response = active_request_handle
.try_send_request( .request(&request.method, &request.params)
active_request_handle,
&request.method,
&request.params,
)
.await; .await;
let response = match response { let response = match response {

View File

@ -13,7 +13,7 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::time::Duration; use std::time::Duration;
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::time::{interval, sleep, MissedTickBehavior}; use tokio::time::{interval, sleep, MissedTickBehavior};
use tracing::{info, trace, warn}; use tracing::{debug, info, warn};
use crate::connections::Web3Connections; use crate::connections::Web3Connections;
@ -141,8 +141,7 @@ impl Web3Connection {
)); ));
} }
// TODO: use anyhow info!("Successful connection: {}", connection);
assert_eq!(chain_id, found_chain_id);
Ok(connection) Ok(connection)
} }
@ -326,7 +325,7 @@ impl ActiveRequestHandle {
{ {
// TODO: this should probably be trace level and use a span // TODO: this should probably be trace level and use a span
// TODO: it would be nice to have the request id on this // TODO: it would be nice to have the request id on this
trace!("Sending {}({:?}) to {}", method, params, self.0); debug!("Sending {}({:?}) to {}", method, params, self.0);
let response = match &self.0.provider { let response = match &self.0.provider {
Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Http(provider) => provider.request(method, params).await,
@ -335,7 +334,7 @@ impl ActiveRequestHandle {
// TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: at least instrument this with more useful information // TODO: at least instrument this with more useful information
trace!("Response from {}: {:?}", self.0, response); debug!("Response from {}: {:?}", self.0, response);
response response
} }

View File

@ -114,19 +114,6 @@ impl Web3Connections {
self.best_head_block_number.load(atomic::Ordering::Acquire) self.best_head_block_number.load(atomic::Ordering::Acquire)
} }
pub async fn try_send_request(
&self,
active_request_handle: ActiveRequestHandle,
method: &str,
params: &Option<Box<RawValue>>,
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
let response = active_request_handle.request(method, params).await;
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response?
response
}
/// Send the same request to all the handles. Returning the fastest successful result. /// Send the same request to all the handles. Returning the fastest successful result.
pub async fn try_send_parallel_requests( pub async fn try_send_parallel_requests(
self: Arc<Self>, self: Arc<Self>,
@ -138,18 +125,15 @@ impl Web3Connections {
// TODO: if only 1 active_request_handles, do self.try_send_request // TODO: if only 1 active_request_handles, do self.try_send_request
let mut unordered_futures = FuturesUnordered::new(); let mut unordered_futures = FuturesUnordered::new();
for connection in active_request_handles { for active_request_handle in active_request_handles {
// clone things so we can pass them to a future // clone things so we can pass them to a future
let connections = self.clone();
let method = method.clone(); let method = method.clone();
let params = params.clone(); let params = params.clone();
let response_sender = response_sender.clone(); let response_sender = response_sender.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
// get the client for this rpc server let response: Box<RawValue> =
let response = connections active_request_handle.request(&method, &params).await?;
.try_send_request(connection, &method, &params)
.await?;
// send the first good response to a one shot channel. that way we respond quickly // send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send // drop the result because errors are expected after the first send