use http and ws providers in a single config

This commit is contained in:
Bryan Stitt 2023-02-12 01:22:53 -08:00
parent d2f7ad5e51
commit c008f50943
17 changed files with 564 additions and 639 deletions

2
Cargo.lock generated
View File

@ -5761,7 +5761,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.13.0"
version = "0.13.1"
dependencies = [
"anyhow",
"argh",

View File

@ -52,50 +52,50 @@ response_cache_max_bytes = 10_000_000_000
[balanced_rpcs.ankr]
display_name = "Ankr"
url = "https://rpc.ankr.com/eth"
http_url = "https://rpc.ankr.com/eth"
soft_limit = 1_000
tier = 0
[balanced_rpcs.cloudflare]
display_name = "Cloudflare"
url = "https://cloudflare-eth.com"
http_url = "https://cloudflare-eth.com"
soft_limit = 1_000
tier = 1
[balanced_rpcs.blastapi]
display_name = "Blast"
url = "https://eth-mainnet.public.blastapi.io"
http_url = "https://eth-mainnet.public.blastapi.io"
soft_limit = 1_000
tier = 1
[balanced_rpcs.mycryptoapi]
display_name = "MyCrypto"
disabled = true
url = "https://api.mycryptoapi.com/eth"
http_url = "https://api.mycryptoapi.com/eth"
soft_limit = 1_000
tier = 2
[balanced_rpcs.pokt-v1]
display_name = "Pokt #1"
url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
http_url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
soft_limit = 500
tier = 2
[balanced_rpcs.pokt]
display_name = "Pokt #2"
url = "https://eth-rpc.gateway.pokt.network"
http_url = "https://eth-rpc.gateway.pokt.network"
soft_limit = 500
tier = 3
[balanced_rpcs.linkpool]
display_name = "Linkpool"
url = "https://main-rpc.linkpool.io"
http_url = "https://main-rpc.linkpool.io"
soft_limit = 500
tier = 4
[balanced_rpcs.runonflux]
display_name = "Run on Flux (light)"
url = "https://ethereumnodelight.app.runonflux.io"
http_url = "https://ethereumnodelight.app.runonflux.io"
soft_limit = 1_000
tier = 5
@ -103,7 +103,7 @@ response_cache_max_bytes = 10_000_000_000
[balanced_rpcs.linkpool-light]
display_name = "Linkpool (light)"
disabled = true
url = "https://main-light.eth.linkpool.io"
http_url = "https://main-light.eth.linkpool.io"
soft_limit = 100
tier = 5
@ -114,34 +114,34 @@ response_cache_max_bytes = 10_000_000_000
[private_rpcs.eden]
disabled = true
display_name = "Eden network"
url = "https://api.edennetwork.io/v1/"
http_url = "https://api.edennetwork.io/v1/"
soft_limit = 1_805
tier = 0
[private_rpcs.eden_beta]
disabled = true
display_name = "Eden network beta"
url = "https://api.edennetwork.io/v1/beta"
http_url = "https://api.edennetwork.io/v1/beta"
soft_limit = 5_861
tier = 0
[private_rpcs.ethermine]
disabled = true
display_name = "Ethermine"
url = "https://rpc.ethermine.org"
http_url = "https://rpc.ethermine.org"
soft_limit = 5_861
tier = 0
[private_rpcs.flashbots]
disabled = true
display_name = "Flashbots Fast"
url = "https://rpc.flashbots.net/fast"
http_url = "https://rpc.flashbots.net/fast"
soft_limit = 7_074
tier = 0
[private_rpcs.securerpc]
disabled = true
display_name = "SecureRPC"
url = "https://gibson.securerpc.com/v1"
http_url = "https://gibson.securerpc.com/v1"
soft_limit = 4_560
tier = 0

View File

@ -16,17 +16,26 @@ response_cache_max_bytes = 1_000_000_000
[balanced_rpcs]
[balanced_rpcs.llama_public_wss]
[balanced_rpcs.llama_public_both]
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
disabled = false
display_name = "LlamaNodes WSS"
url = "wss://eth.llamarpc.com/"
display_name = "LlamaNodes Both"
ws_url = "wss://eth.llamarpc.com/"
http_url = "https://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0
[balanced_rpcs.llama_public_https]
disabled = false
display_name = "LlamaNodes HTTPS"
url = "https://eth.llamarpc.com/"
http_url = "https://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0
[balanced_rpcs.llama_public_wss]
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
disabled = false
display_name = "LlamaNodes WSS"
ws_url = "wss://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0

View File

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.13.0"
version = "0.13.1"
edition = "2021"
default-run = "web3_proxy_cli"

View File

@ -482,7 +482,7 @@ impl Web3ProxyApp {
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.timeout(Duration::from_secs(5 * 60))
.user_agent(APP_USER_AGENT)
.build()?,
);
@ -573,17 +573,17 @@ impl Web3ProxyApp {
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
block_map.clone(),
top_config.app.chain_id,
db_conn.clone(),
balanced_rpcs,
http_client.clone(),
vredis_pool.clone(),
block_map.clone(),
Some(watch_consensus_head_sender),
top_config.app.min_sum_soft_limit,
top_config.app.min_synced_rpcs,
Some(pending_tx_sender.clone()),
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
vredis_pool.clone(),
balanced_rpcs,
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
@ -599,21 +599,22 @@ impl Web3ProxyApp {
None
} else {
let (private_rpcs, private_handle) = Web3Rpcs::spawn(
block_map,
top_config.app.chain_id,
db_conn.clone(),
private_rpcs,
http_client.clone(),
0,
0,
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
vredis_pool.clone(),
block_map,
private_rpcs,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
0,
0,
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
pending_transactions.clone(),
)
.await
.context("spawning private_rpcs")?;
@ -1035,9 +1036,17 @@ impl Web3ProxyApp {
| "db_getString"
| "db_putHex"
| "db_putString"
| "debug_accountRange"
| "debug_backtraceAt"
| "debug_blockProfile"
| "debug_chaindbCompact"
| "debug_chaindbProperty"
| "debug_cpuProfile"
| "debug_freeOSMemory"
| "debug_freezeClient"
| "debug_gcStats"
| "debug_goTrace"
| "debug_memStats"
| "debug_mutexProfile"
| "debug_setBlockProfileRate"
| "debug_setGCPercent"

View File

@ -205,31 +205,27 @@ mod tests {
(
"anvil".to_string(),
Web3RpcConfig {
disabled: false,
display_name: None,
url: anvil.endpoint(),
backup: Some(false),
block_data_limit: None,
http_url: Some(anvil.endpoint()),
soft_limit: 100,
hard_limit: None,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
..Default::default()
},
),
(
"anvil_ws".to_string(),
Web3RpcConfig {
disabled: false,
display_name: None,
url: anvil.ws_endpoint(),
backup: Some(false),
block_data_limit: None,
ws_url: Some(anvil.ws_endpoint()),
soft_limit: 100,
hard_limit: None,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
..Default::default()
},
),
(
"anvil_both".to_string(),
Web3RpcConfig {
http_url: Some(anvil.endpoint()),
ws_url: Some(anvil.ws_endpoint()),
..Default::default()
},
),
]),

View File

@ -132,6 +132,12 @@ pub async fn block_needed(
head_block_num: U64,
rpcs: &Web3Rpcs,
) -> anyhow::Result<BlockNeeded> {
// some requests have potentially very large responses
// TODO: only skip caching if the response actually is large
if method.starts_with("trace_") || method == "debug_traceTransaction" {
return Ok(BlockNeeded::CacheNever);
}
let params = if let Some(params) = params {
// grab the params so we can inspect and potentially modify them
params

View File

@ -197,15 +197,19 @@ fn default_response_cache_max_bytes() -> u64 {
}
/// Configuration for a backend web3 RPC server
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct Web3RpcConfig {
/// simple way to disable a connection without deleting the row
#[serde(default)]
pub disabled: bool,
/// a name used in /status and other user facing messages
pub display_name: Option<String>,
/// websocket (or http if no websocket)
pub url: String,
/// (deprecated) rpc url
pub url: Option<String>,
/// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks
pub ws_url: Option<String>,
/// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON
pub http_url: Option<String>,
/// block data limit. If None, will be queried
pub block_data_limit: Option<u64>,
/// the requests per second at which the server starts slowing down
@ -213,14 +217,15 @@ pub struct Web3RpcConfig {
/// the requests per second at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
pub backup: Option<bool>,
#[serde(default)]
pub backup: bool,
/// All else equal, a server with a lower tier receives all requests
#[serde(default = "default_tier")]
pub tier: u64,
/// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs
#[serde(default)]
pub subscribe_txs: Option<bool>,
pub subscribe_txs: bool,
/// unknown config options get put here
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
@ -245,47 +250,24 @@ impl Web3RpcConfig {
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
}
let hard_limit = match (self.hard_limit, redis_pool) {
(None, None) => None,
(Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)),
(None, Some(_)) => None,
(Some(_hard_limit), None) => {
return Err(anyhow::anyhow!(
"no redis client pool! needed for hard limit"
))
}
};
let tx_id_sender = if self.subscribe_txs.unwrap_or(false) {
tx_id_sender
} else {
None
};
let backup = self.backup.unwrap_or(false);
Web3Rpc::spawn(
self,
name,
self.display_name,
chain_id,
db_conn,
self.url,
http_client,
http_interval_sender,
hard_limit,
self.soft_limit,
backup,
self.block_data_limit,
redis_pool,
block_map,
block_sender,
tx_id_sender,
true,
self.tier,
reconnect,
)
.await
}

View File

@ -149,12 +149,13 @@ impl Web3Rpcs {
// TODO: if error, retry?
let block: ArcBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), false)
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
.await?
.request::<_, Option<_>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
None,
)
.await?
.context("no block!")?,

View File

View File

View File

@ -56,17 +56,17 @@ impl Web3Rpcs {
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
block_map: BlockHashesCache,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
server_configs: HashMap<String, Web3RpcConfig>,
http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
block_map: BlockHashesCache,
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32,
min_head_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
min_sum_soft_limit: u32,
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
server_configs: HashMap<String, Web3RpcConfig>,
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -160,6 +160,7 @@ impl Web3Rpcs {
block_map,
block_sender,
pending_tx_id_sender,
true,
)
.await
});
@ -343,7 +344,7 @@ impl Web3Rpcs {
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &json!(&params), error_level.into())
.request(method, &json!(&params), error_level.into(), None)
.await;
result
})
@ -473,12 +474,20 @@ impl Web3Rpcs {
for x in self
.conns
.values()
.filter(|x| if allow_backups { true } else { !x.backup })
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed))
.filter(|x| {
if let Some(max_block_needed) = max_block_needed {
x.has_block_data(max_block_needed)
if !allow_backups && x.backup {
false
} else if skip.contains(x) {
false
} else if !x.has_block_data(min_block_needed) {
false
} else if max_block_needed
.and_then(|max_block_needed| {
Some(!x.has_block_data(max_block_needed))
})
.unwrap_or(false)
{
false
} else {
true
}
@ -521,58 +530,22 @@ impl Web3Rpcs {
let mut earliest_retry_at = None;
for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() {
// under heavy load, it is possible for even our best server to be negative
let mut minimum = f64::MAX;
let mut maximum = f64::MIN;
// we sort on a combination of values. cache them here so that we don't do this math multiple times.
let mut available_request_map: HashMap<_, f64> = usable_rpcs
// TODO: is this necessary if we use sort_by_cached_key?
let available_request_map: HashMap<_, f64> = usable_rpcs
.iter()
.map(|rpc| {
// TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that?
// TODO: get active requests out of redis (that's definitely too slow)
// TODO: do something with hard limit instead? (but that is hitting redis too much)
let active_requests = rpc.active_requests() as f64;
let soft_limit = rpc.soft_limit as f64;
let available_requests = soft_limit - active_requests;
// trace!("available requests on {}: {}", rpc, available_requests);
minimum = minimum.min(available_requests);
maximum = maximum.max(available_requests);
(rpc, available_requests)
// TODO: weighted sort by remaining hard limit?
// TODO: weighted sort by soft_limit - ewma_active_requests? that assumes soft limits are any good
(rpc, 1.0)
})
.collect();
// trace!("minimum available requests: {}", minimum);
// trace!("maximum available requests: {}", maximum);
if maximum < 0.0 {
// TODO: if maximum < 0 and there are other tiers on the same block, we should include them now
warn!("soft limits overloaded: {} to {}", minimum, maximum)
}
// choose_multiple_weighted can't have negative numbers. shift up if any are negative
// TODO: is this a correct way to shift?
if minimum < 0.0 {
available_request_map = available_request_map
.into_iter()
.map(|(rpc, available_requests)| {
// TODO: is simple addition the right way to shift everyone?
// TODO: probably want something non-linear
// minimum is negative, so we subtract to make available requests bigger
let x = available_requests - minimum;
(rpc, x)
})
.collect()
}
debug!("todo: better sort here");
let sorted_rpcs = {
if usable_rpcs.len() == 1 {
// TODO: return now instead? we shouldn't need another alloc
// TODO: try the next tier
vec![usable_rpcs.get(0).expect("there should be 1")]
} else {
let mut rng = thread_fast_rng::thread_fast_rng();
@ -589,12 +562,10 @@ impl Web3Rpcs {
};
// now that the rpcs are sorted, try to get an active request handle for one of them
// TODO: pick two randomly and choose the one with the lower rpc.latency.ewma
for best_rpc in sorted_rpcs.into_iter() {
// increment our connection counter
match best_rpc
.try_request_handle(authorization, min_block_needed.is_none())
.await
{
match best_rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::Handle(handle)) => {
// trace!("opened handle: {}", best_rpc);
return Ok(OpenRequestResult::Handle(handle));
@ -741,10 +712,7 @@ impl Web3Rpcs {
}
// check rate limits and increment our connection counter
match connection
.try_request_handle(authorization, min_block_needed.is_none())
.await
{
match connection.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -827,6 +795,7 @@ impl Web3Rpcs {
&request.method,
&json!(request.params),
RequestRevertHandler::Save,
None,
)
.await;
@ -1214,7 +1183,6 @@ mod tests {
use super::*;
use crate::rpcs::{
blockchain::{ConsensusFinder, SavedBlock},
one::ProviderState,
provider::Web3Provider,
};
use ethers::types::{Block, U256};
@ -1338,9 +1306,6 @@ mod tests {
let head_rpc = Web3Rpc {
name: "synced".to_string(),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
@ -1352,9 +1317,6 @@ mod tests {
let lagged_rpc = Web3Rpc {
name: "lagged".to_string(),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
@ -1553,9 +1515,6 @@ mod tests {
let pruned_rpc = Web3Rpc {
name: "pruned".to_string(),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
soft_limit: 3_000,
automatic_block_limit: false,
backup: false,
@ -1567,9 +1526,6 @@ mod tests {
let archive_rpc = Web3Rpc {
name: "archive".to_string(),
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
Web3Provider::Mock,
))),
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,

View File

@ -3,9 +3,9 @@ use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::U256;
use futures::future::try_join_all;
@ -21,50 +21,13 @@ use serde_json::json;
use std::cmp::min;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::sync::atomic::{self, AtomicU64};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
#[derive(Clone, Debug)]
pub enum ProviderState {
None,
Connecting(Arc<Web3Provider>),
Connected(Arc<Web3Provider>),
}
impl Default for ProviderState {
fn default() -> Self {
Self::None
}
}
impl ProviderState {
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
match self {
ProviderState::None => None,
ProviderState::Connecting(x) => {
if allow_not_ready {
Some(x)
} else {
// TODO: do a ready check here?
None
}
}
ProviderState::Connected(x) => {
if x.ready() {
Some(x)
} else {
None
}
}
}
}
}
pub struct Web3RpcLatencies {
/// Traack how far behind the fastest node we are
pub new_head: Histogram<u64>,
@ -93,19 +56,15 @@ pub struct Web3Rpc {
pub name: String,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
/// TODO: can we get this from the provider? do we even need it?
pub(super) url: String,
pub(super) ws_url: Option<String>,
pub(super) http_url: Option<String>,
/// Some connections use an http_client. we keep a clone for reconnecting
pub(super) http_client: Option<reqwest::Client>,
/// keep track of currently open requests. We sort on this
pub(super) active_requests: AtomicU32,
/// keep track of total requests from the frontend
pub(super) frontend_requests: AtomicU64,
/// keep track of total requests from web3-proxy itself
pub(super) internal_requests: AtomicU64,
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
pub(super) provider_state: AsyncRwLock<ProviderState>,
/// this provider is only used for new heads subscriptions
/// TODO: put the provider inside an arc?
pub(super) new_head_client: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// keep track of hard limits
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
@ -121,7 +80,7 @@ pub struct Web3Rpc {
pub(super) block_data_limit: AtomicU64,
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
pub(super) head_block: RwLock<Option<SavedBlock>>,
/// Track how fast this RPC is
pub(super) latency: Web3RpcLatencies,
@ -132,39 +91,54 @@ impl Web3Rpc {
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
mut config: Web3RpcConfig,
name: String,
display_name: Option<String>,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
http_client: Option<reqwest::Client>,
// TODO: rename to http_new_head_interval_sender?
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
// TODO: have a builder struct for this.
hard_limit: Option<(u64, RedisPool)>,
// TODO: think more about this type
soft_limit: u32,
backup: bool,
block_data_limit: Option<u64>,
redis_pool: Option<RedisPool>,
// TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough
// soft_limit: u32,
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
tier: u64,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
// TODO: is cache size 1 okay? i think we need
RedisRateLimiter::new(
"web3_proxy",
&format!("{}:{}", chain_id, name),
hard_rate_limit,
60.0,
redis_pool,
)
});
let hard_limit = match (config.hard_limit, redis_pool) {
(None, None) => None,
(Some(hard_limit), Some(redis_pool)) => {
// TODO: in process rate limiter instead? or is deffered good enough?
let rrl = RedisRateLimiter::new(
"web3_proxy",
&format!("{}:{}", chain_id, name),
hard_limit,
60.0,
redis_pool,
);
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into();
Some(rrl)
}
(None, Some(_)) => None,
(Some(_hard_limit), None) => {
return Err(anyhow::anyhow!(
"no redis client pool! needed for hard limit"
))
}
};
let tx_id_sender = if config.subscribe_txs {
// TODO: warn if tx_id_sender is None?
tx_id_sender
} else {
None
};
let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into();
let automatic_block_limit =
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
@ -178,19 +152,36 @@ impl Web3Rpc {
None
};
if config.ws_url.is_none() && config.http_url.is_none() {
if let Some(url) = config.url {
if url.starts_with("ws") {
config.ws_url = Some(url);
} else if url.starts_with("http") {
config.http_url = Some(url);
} else {
return Err(anyhow!("only ws or http urls are supported"));
}
} else {
return Err(anyhow!(
"either ws_url or http_url are required. it is best to set both"
));
}
}
let new_connection = Self {
name,
db_conn: db_conn.clone(),
display_name,
display_name: config.display_name,
http_client,
url: url_str,
ws_url: config.ws_url,
http_url: config.http_url,
hard_limit,
hard_limit_until,
soft_limit,
soft_limit: config.soft_limit,
automatic_block_limit,
backup,
block_data_limit,
tier,
tier: config.tier,
..Default::default()
};
@ -224,6 +215,7 @@ impl Web3Rpc {
async fn check_block_data_limit(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<Option<u64>> {
if !self.automatic_block_limit {
// TODO: is this a good thing to return?
@ -238,7 +230,7 @@ impl Web3Rpc {
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let handle = self
.wait_for_request_handle(authorization, None, true)
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
.await?;
let head_block_num_future = handle.request::<Option<()>, U256>(
@ -246,6 +238,7 @@ impl Web3Rpc {
&None,
// error here are expected, so keep the level low
Level::Debug.into(),
unlocked_provider.clone(),
);
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
@ -264,7 +257,7 @@ impl Web3Rpc {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let handle = self
.wait_for_request_handle(authorization, None, true)
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
.await?;
let archive_result: Result<Bytes, _> = handle
@ -276,6 +269,7 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Level::Trace.into(),
unlocked_provider.clone(),
)
.await;
@ -403,119 +397,111 @@ impl Web3Rpc {
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
// trace!("provider_state {} locking...", self);
let mut provider_state = self
.provider_state
.try_write()
.context("locking provider for write")?;
// trace!("provider_state {} locked: {:?}", self, provider_state);
match &*provider_state {
ProviderState::None => {
info!("connecting to {}", self);
if let Ok(mut unlocked_provider) = self.new_head_client.try_write() {
#[cfg(test)]
if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() {
return Ok(());
}
ProviderState::Connecting(provider) | ProviderState::Connected(provider) => {
// disconnect the current provider
if let Web3Provider::Mock = provider.as_ref() {
return Ok(());
*unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() {
// set up ws client
match &*unlocked_provider {
None => {
info!("connecting to {}", self);
}
Some(_) => {
debug!("reconnecting to {}", self);
// tell the block subscriber that this rpc doesn't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
// reset sync status
let mut head_block = self.head_block.write();
*head_block = None;
// disconnect the current provider
// TODO: what until the block_sender's receiver finishes updating this item?
*unlocked_provider = None;
}
}
debug!("reconnecting to {}", self);
let p = Web3Provider::from_str(ws_url.as_str(), None)
.await
.context(format!("failed connecting to {}", ws_url))?;
// disconnect the current provider
*provider_state = ProviderState::None;
assert!(p.ws().is_some());
// reset sync status
// trace!("locking head block on {}", self);
{
let mut head_block = self.head_block.write();
*head_block = None;
}
// trace!("done with head block on {}", self);
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((None, self.clone()))
Some(Arc::new(p))
} else {
// http client
if let Some(url) = &self.http_url {
let p = Web3Provider::from_str(url, self.http_client.clone())
.await
.context("block_sender during connect")?;
.context(format!("failed connecting to {}", url))?;
assert!(p.http().is_some());
Some(Arc::new(p))
} else {
None
}
};
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await?
.request(
"eth_chainId",
&json!(Option::None::<()>),
Level::Trace.into(),
unlocked_provider.clone(),
)
.await;
// trace!("found_chain_id: {:?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
}
}
Err(e) => {
return Err(anyhow::Error::from(e));
}
}
}
// trace!("Creating new Web3Provider on {}", self);
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
self.check_block_data_limit(&authorization, unlocked_provider.clone())
.await?;
// trace!("saving provider state as NotReady on {}", self);
*provider_state = ProviderState::Connecting(Arc::new(new_provider));
drop(unlocked_provider);
// drop the lock so that we can get a request handle
// trace!("provider_state {} unlocked", self);
drop(provider_state);
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None, true)
.await?
.request(
"eth_chainId",
&json!(Option::None::<()>),
Level::Trace.into(),
)
.await;
// trace!("found_chain_id: {:?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
}
info!("successfully connected to {}", self);
} else {
if self.new_head_client.read().await.is_none() {
return Err(anyhow!("failed waiting for client"));
}
Err(e) => {
return Err(anyhow::Error::from(e));
}
}
self.check_block_data_limit(&authorization).await?;
{
// trace!("locking for ready...");
let mut provider_state = self.provider_state.write().await;
// trace!("locked for ready...");
// TODO: do this without a clone
let ready_provider = provider_state
.provider(true)
.await
.context("provider missing")?
.clone();
*provider_state = ProviderState::Connected(ready_provider);
// trace!("unlocked for ready...");
}
info!("successfully connected to {}", self);
};
Ok(())
}
#[inline]
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
}
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>,
@ -558,7 +544,7 @@ impl Web3Rpc {
if self.block_data_limit() == U64::zero() {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
if let Err(err) = self.check_block_data_limit(&authorization).await {
if let Err(err) = self.check_block_data_limit(&authorization, None).await {
warn!(
"failed checking block limit after {} finished syncing. {:?}",
self, err
@ -629,29 +615,21 @@ impl Web3Rpc {
// provider is ready
ready_tx.send(()).unwrap();
// wait before doing the initial health check
// TODO: how often?
// TODO: subscribe to self.head_block
// TODO: reset this timeout every time a new block is seen
let health_sleep_seconds = 10;
// wait before doing the initial health check
sleep(Duration::from_secs(health_sleep_seconds)).await;
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
// trace!("health check on {}. locking...", conn);
if conn
.provider_state
.read()
.await
.provider(false)
.await
.is_none()
{
if let Some(client) = &*conn.new_head_client.read().await {
// trace!("health check unlocked with error on {}", conn);
// returning error will trigger a reconnect
return Err(anyhow::anyhow!("{} is not ready", conn));
// TODO: do a query of some kind
}
// trace!("health check on {}. unlocked", conn);
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
@ -712,7 +690,7 @@ impl Web3Rpc {
Ok(())
}
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: Arc<Self>,
authorization: Arc<Authorization>,
@ -722,233 +700,218 @@ impl Web3Rpc {
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
// trace!("locking on new heads");
let provider_state = self
.provider_state
.try_read()
.context("subscribe_new_heads")?
.clone();
// trace!("unlocked on new heads");
let unlocked_provider = self.new_head_client.read().await;
// TODO: need a timeout
if let ProviderState::Connected(provider) = provider_state {
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: try watch_blocks and fall back to this?
match unlocked_provider.as_deref() {
Some(Web3Provider::Http(_client)) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver.unwrap();
let mut http_interval_receiver = http_interval_receiver.unwrap();
let mut last_hash = H256::zero();
let mut last_hash = H256::zero();
loop {
// TODO: what should the max_wait be?
match self
.wait_for_request_handle(&authorization, None, false)
.await
{
Ok(active_request_handle) => {
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
loop {
// TODO: what should the max_wait be?
match self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await
{
Ok(active_request_handle) => {
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
None,
)
.await;
match block {
Ok(None) => {
warn!("no head block on {}", self);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await;
.await?;
}
Ok(Some(block)) => {
// don't send repeat blocks
let new_hash =
block.hash.expect("blocks here should always have hashes");
match block {
Ok(None) => {
warn!("no head block on {}", self);
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
.await?;
}
Ok(Some(block)) => {
// don't send repeat blocks
let new_hash = block
.hash
.expect("blocks here should always have hashes");
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.send_head_block_result(
Ok(Some(block)),
&block_sender,
block_map.clone(),
)
.await?;
}
}
Err(err) => {
// we did not get a block back. something is up with the server. take it out of rotation
self.send_head_block_result(
Err(err),
Ok(Some(block)),
&block_sender,
block_map.clone(),
)
.await?;
}
}
Err(err) => {
// we did not get a block back. something is up with the server. take it out of rotation
self.send_head_block_result(
Err(err),
&block_sender,
block_map.clone(),
)
.await?;
}
}
Err(err) => {
warn!("Internal error on latest block from {}. {:?}", self, err);
}
Err(err) => {
warn!("Internal error on latest block from {}. {:?}", self, err);
self.send_head_block_result(
Ok(None),
&block_sender,
block_map.clone(),
)
self.send_head_block_result(Ok(None), &block_sender, block_map.clone())
.await?;
// TODO: what should we do? sleep? extra time?
}
// TODO: what should we do? sleep? extra time?
}
}
// wait for the next interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
// channel is closed! that's not good. bubble the error up
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
warn!("http interval on {} lagging by {}!", self, lagged);
}
// wait for the next interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
// channel is closed! that's not good. bubble the error up
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
warn!("http interval on {} lagging by {}!", self, lagged);
}
}
}
}
Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(&authorization, None, false)
.await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, None, false)
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
)
.await;
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
.hash
.expect("blocks should always have a hash here"),
_ => H256::zero(),
};
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
while let Some(new_block) = stream.next().await {
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
.hash
.expect("blocks should always have a hash here");
if new_hash == last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue;
} else {
last_hash = new_hash;
}
self.send_head_block_result(
Ok(Some(Arc::new(new_block))),
&block_sender,
block_map.clone(),
)
.await?;
}
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!("new_heads subscription to {} ended", self);
Err(anyhow::anyhow!("new_heads subscription ended"))
}
}
} else {
Err(anyhow::anyhow!(
"Provider not ready! Unable to subscribe to heads"
))
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await;
let mut stream = client.subscribe_blocks().await?;
drop(active_request_handle);
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// but all that does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: do this part over http?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
unlocked_provider.clone(),
)
.await;
let mut last_hash = match &block {
Ok(Some(new_block)) => new_block
.hash
.expect("blocks should always have a hash here"),
_ => H256::zero(),
};
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
while let Some(new_block) = stream.next().await {
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
.hash
.expect("blocks should always have a hash here");
if new_hash == last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue;
} else {
last_hash = new_hash;
}
self.send_head_block_result(
Ok(Some(Arc::new(new_block))),
&block_sender,
block_map.clone(),
)
.await?;
}
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!("new_heads subscription to {} ended", self);
Err(anyhow::anyhow!("new_heads subscription ended"))
}
None => todo!("what should happen now? wait for a connection?"),
#[cfg(test)]
Some(Web3Provider::Mock) => unimplemented!(),
}
}
/// Turn on the firehose of pending transactions
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
if let ProviderState::Connected(provider) = self
.provider_state
.try_read()
.context("subscribe_pending_transactions")?
.clone()
{
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_ref() {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: maybe subscribe to self.head_block?
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
futures::future::pending::<()>().await;
}
Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, None, false)
.await?;
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
let provider = self.new_head_client.read().await;
let mut stream = provider.subscribe_pending_txs().await?;
drop(active_request_handle);
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!("pending_transactions subscription ended on {}", self);
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_deref() {
None => {
// TODO: wait for a provider
return Err(anyhow!("no provider"));
}
} else {
warn!(
"Provider not ready! Unable to watch pending transactions on {}",
self
);
Some(Web3Provider::Http(provider)) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: maybe subscribe to self.head_block?
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
futures::future::pending::<()>().await;
}
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, None, provider.clone())
.await?;
let mut stream = client.subscribe_pending_txs().await?;
drop(active_request_handle);
while let Some(pending_tx_id) = stream.next().await {
tx_id_sender
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!("pending_transactions subscription ended on {}", self);
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
#[cfg(test)]
Some(Web3Provider::Mock) => futures::future::pending::<()>().await,
}
Ok(())
@ -957,17 +920,17 @@ impl Web3Rpc {
/// be careful with this; it might wait forever!
/// `allow_not_ready` is only for use by health checks while starting the provider
/// TODO: don't use anyhow. use specific error type
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
pub async fn wait_for_request_handle<'a>(
self: &'a Arc<Self>,
authorization: &'a Arc<Authorization>,
max_wait: Option<Duration>,
allow_not_ready: bool,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<OpenRequestHandle> {
let max_wait = max_wait.map(|x| Instant::now() + x);
loop {
match self
.try_request_handle(authorization, allow_not_ready)
.try_request_handle(authorization, unlocked_provider.clone())
.await
{
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
@ -1015,20 +978,14 @@ impl Web3Rpc {
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
allow_not_ready: bool,
// TODO: borrow on this instead of needing to clone the Arc?
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<OpenRequestResult> {
// TODO: think more about this read block
if !allow_not_ready
&& self
.provider_state
.read()
.await
.provider(allow_not_ready)
.await
.is_none()
{
trace!("{} is not ready", self);
// TODO: this should *not* be new_head_client. this should be a separate object
if unlocked_provider.is_some() || self.new_head_client.read().await.is_some() {
// we already have an unlocked provider. no need to lock
} else {
return Ok(OpenRequestResult::NotReady(self.backup));
}
@ -1144,15 +1101,11 @@ impl Serialize for Web3Rpc {
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field(
"total_requests",
&self.frontend_requests.load(atomic::Ordering::Relaxed),
)?;
// TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana?
// state.serialize_field(
// "frontend_requests",
// &self.frontend_requests.load(atomic::Ordering::Relaxed),
// )?;
{
// TODO: maybe this is too much data. serialize less?
@ -1216,7 +1169,7 @@ mod tests {
let x = Web3Rpc {
name: "name".to_string(),
url: "ws://example.com".to_string(),
ws_url: Some("ws://example.com".to_string()),
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,

View File

@ -2,22 +2,45 @@ use anyhow::Context;
use derive_more::From;
use std::time::Duration;
// TODO: our own structs for these that handle streaming large responses
type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
// TODO: custom types that let us stream JSON responses
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
// TODO: only include this for tests.
Both(EthersHttpProvider, EthersWsProvider),
Http(EthersHttpProvider),
// TODO: deadpool? custom tokio-tungstenite
Ws(EthersWsProvider),
#[cfg(test)]
Mock,
}
impl Web3Provider {
pub fn ready(&self) -> bool {
match self {
Self::Mock => true,
Self::Both(_, ws) => ws.as_ref().ready(),
Self::Http(_) => true,
Self::Ws(provider) => provider.as_ref().ready(),
Self::Ws(ws) => ws.as_ref().ready(),
#[cfg(test)]
Self::Mock => true,
}
}
pub fn http(&self) -> Option<&EthersHttpProvider> {
match self {
Self::Http(x) => Some(x),
_ => None,
}
}
pub fn ws(&self) -> Option<&EthersWsProvider> {
match self {
Self::Both(_, x) | Self::Ws(x) => Some(x),
_ => None,
}
}

View File

@ -1,6 +1,6 @@
use super::one::Web3Rpc;
use super::provider::Web3Provider;
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use chrono::Utc;
use entities::revert_log;
@ -11,7 +11,6 @@ use log::{debug, error, trace, warn, Level};
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
@ -27,11 +26,11 @@ pub enum OpenRequestResult {
}
/// Make RPC requests through this handle and drop it when you are done.
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
conn: Arc<Web3Rpc>,
provider: Arc<Web3Provider>,
}
/// Depending on the context, RPC errors can require different handling.
@ -123,60 +122,9 @@ impl Authorization {
impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Rpc>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
let mut provider = None;
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
let ready_provider = conn
.provider_state
.read()
.await
// TODO: hard code true, or take a bool in the `new` function?
.provider(true)
.await
.cloned();
// trace!("waiting on provider: unlocked!");
match ready_provider {
None => {
if !logged {
logged = true;
warn!("no provider for {}!", conn);
}
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
sleep(Duration::from_millis(100)).await
}
Some(x) => provider = Some(x),
}
}
let provider = provider.expect("provider was checked already");
// TODO: handle overflows?
// TODO: what ordering?
match authorization.as_ref().authorization_type {
AuthorizationType::Frontend => {
conn.frontend_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
AuthorizationType::Internal => {
conn.internal_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
}
Self {
authorization,
conn,
provider,
}
}
@ -196,6 +144,7 @@ impl OpenRequestHandle {
method: &str,
params: &P,
revert_handler: RequestRevertHandler,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
@ -205,12 +154,45 @@ impl OpenRequestHandle {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.conn, %method, "request");
trace!("requesting from {}", self.conn);
let mut provider: Option<Arc<Web3Provider>> = None;
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
// TODO: this should *not* be new_head_client. that is dedicated to only new heads
if let Some(unlocked_provider) = unlocked_provider {
provider = Some(unlocked_provider);
break;
}
let unlocked_provider = self.conn.new_head_client.read().await;
if let Some(unlocked_provider) = unlocked_provider.clone() {
provider = Some(unlocked_provider);
break;
}
if !logged {
debug!("no provider for open handle on {}", self.conn);
logged = true;
}
sleep(Duration::from_millis(100)).await;
}
let provider = provider.expect("provider was checked already");
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match &*self.provider {
let response = match provider.as_ref() {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
p.request(method, params).await
}
};
// // TODO: i think ethers already has trace logging (and does it much more fancy)
@ -266,8 +248,22 @@ impl OpenRequestHandle {
// check for "execution reverted" here
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*self.provider {
let msg = match &*provider {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
Web3Provider::Both(_, _) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
Some(&err.message)
} else if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
Some(&err.message)
} else {
None
}
}
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
@ -377,11 +373,3 @@ impl OpenRequestHandle {
response
}
}
impl Drop for OpenRequestHandle {
fn drop(&mut self) {
self.conn
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
}
}

View File

@ -28,13 +28,15 @@ impl Web3Rpcs {
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_request_handle(authorization, false).await {
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::Error.into(),
None,
)
.await?
}

View File