add name to web3rpcs

This commit is contained in:
Bryan Stitt 2023-05-16 16:04:17 -07:00
parent 59e864e70f
commit 5d64524aa6
4 changed files with 35 additions and 13 deletions

@ -54,6 +54,9 @@ Check that the proxy is working:
```
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544
```
```
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"eth_getBalance", "params": ["0x0000000000000000000000000000000000000000", "latest"],"id":1}' 127.0.0.1:8544
```
Check that the websocket is working:

@ -2,7 +2,7 @@ mod rtt_estimate;
use std::sync::Arc;
use log::{error, info};
use log::{error, trace};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;

@ -648,6 +648,7 @@ impl Web3ProxyApp {
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit,
"balanced".to_string(),
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
Some(watch_consensus_head_sender),
@ -675,6 +676,7 @@ impl Web3ProxyApp {
None,
0,
0,
"protected".to_string(),
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
@ -708,6 +710,7 @@ impl Web3ProxyApp {
None,
0,
0,
"eip4337".to_string(),
pending_transactions.clone(),
None,
None,

@ -33,7 +33,7 @@ use serde_json::value::RawValue;
use std::borrow::Cow;
use std::cmp::{min_by_key, Reverse};
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
@ -43,6 +43,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Rpcs {
pub(crate) name: String,
/// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
@ -87,6 +88,7 @@ impl Web3Rpcs {
max_block_lag: Option<U64>,
min_head_rpcs: usize,
min_sum_soft_limit: u32,
name: String,
pending_transaction_cache: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
@ -184,19 +186,20 @@ impl Web3Rpcs {
let connections = Arc::new(Self {
block_sender,
by_name,
http_interval_sender,
watch_consensus_rpcs_sender,
watch_consensus_head_sender,
pending_transaction_cache,
pending_tx_id_sender,
pending_tx_id_receiver,
blocks_by_hash,
blocks_by_number,
min_sum_soft_limit,
min_head_rpcs,
by_name,
http_interval_sender,
max_block_age,
max_block_lag,
min_head_rpcs,
min_sum_soft_limit,
name,
pending_transaction_cache,
pending_tx_id_receiver,
pending_tx_id_sender,
watch_consensus_head_sender,
watch_consensus_rpcs_sender,
});
let authorization = Arc::new(Authorization::internal(db_conn)?);
@ -629,7 +632,10 @@ impl Web3Rpcs {
match earliest_retry_at {
None => {
// none of the servers gave us a time to retry at
debug!("no servers on {:?} gave a retry time. {:?}", self, skip);
debug!(
"{:?} - no servers on {:?} gave a retry time! Skipped {:?}",
request_ulid, self, skip
);
// TODO: bring this back? need to think about how to do this with `allow_backups`
// we could return an error here, but maybe waiting a second will fix the problem
@ -647,9 +653,10 @@ impl Web3Rpcs {
// TODO: log the server that retry_at came from
// TODO: `self` doesn't log well. get a pretty name for this group of servers
warn!(
"{:?} - no servers on {:?}! retry in {:?}s",
"{:?} - no servers in {} ready! Skipped {:?}. Retry in {:?}s",
request_ulid,
self,
skip,
earliest_retry_at
.duration_since(Instant::now())
.as_secs_f32()
@ -1156,6 +1163,12 @@ impl Web3Rpcs {
}
}
impl Display for Web3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.name)
}
}
impl fmt::Debug for Web3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
@ -1413,6 +1426,7 @@ mod tests {
block_sender: block_sender.clone(),
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()
@ -1664,6 +1678,7 @@ mod tests {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()
@ -1828,6 +1843,7 @@ mod tests {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
http_interval_sender: None,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()