use http and ws providers in a single config
This commit is contained in:
parent
d4b1817505
commit
4daf984b4b
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -5761,7 +5761,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "web3_proxy"
|
name = "web3_proxy"
|
||||||
version = "0.13.0"
|
version = "0.13.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"argh",
|
"argh",
|
||||||
|
@ -52,50 +52,50 @@ response_cache_max_bytes = 10_000_000_000
|
|||||||
|
|
||||||
[balanced_rpcs.ankr]
|
[balanced_rpcs.ankr]
|
||||||
display_name = "Ankr"
|
display_name = "Ankr"
|
||||||
url = "https://rpc.ankr.com/eth"
|
http_url = "https://rpc.ankr.com/eth"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[balanced_rpcs.cloudflare]
|
[balanced_rpcs.cloudflare]
|
||||||
display_name = "Cloudflare"
|
display_name = "Cloudflare"
|
||||||
url = "https://cloudflare-eth.com"
|
http_url = "https://cloudflare-eth.com"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 1
|
tier = 1
|
||||||
|
|
||||||
[balanced_rpcs.blastapi]
|
[balanced_rpcs.blastapi]
|
||||||
display_name = "Blast"
|
display_name = "Blast"
|
||||||
url = "https://eth-mainnet.public.blastapi.io"
|
http_url = "https://eth-mainnet.public.blastapi.io"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 1
|
tier = 1
|
||||||
|
|
||||||
[balanced_rpcs.mycryptoapi]
|
[balanced_rpcs.mycryptoapi]
|
||||||
display_name = "MyCrypto"
|
display_name = "MyCrypto"
|
||||||
disabled = true
|
disabled = true
|
||||||
url = "https://api.mycryptoapi.com/eth"
|
http_url = "https://api.mycryptoapi.com/eth"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 2
|
tier = 2
|
||||||
|
|
||||||
[balanced_rpcs.pokt-v1]
|
[balanced_rpcs.pokt-v1]
|
||||||
display_name = "Pokt #1"
|
display_name = "Pokt #1"
|
||||||
url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
|
http_url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79"
|
||||||
soft_limit = 500
|
soft_limit = 500
|
||||||
tier = 2
|
tier = 2
|
||||||
|
|
||||||
[balanced_rpcs.pokt]
|
[balanced_rpcs.pokt]
|
||||||
display_name = "Pokt #2"
|
display_name = "Pokt #2"
|
||||||
url = "https://eth-rpc.gateway.pokt.network"
|
http_url = "https://eth-rpc.gateway.pokt.network"
|
||||||
soft_limit = 500
|
soft_limit = 500
|
||||||
tier = 3
|
tier = 3
|
||||||
|
|
||||||
[balanced_rpcs.linkpool]
|
[balanced_rpcs.linkpool]
|
||||||
display_name = "Linkpool"
|
display_name = "Linkpool"
|
||||||
url = "https://main-rpc.linkpool.io"
|
http_url = "https://main-rpc.linkpool.io"
|
||||||
soft_limit = 500
|
soft_limit = 500
|
||||||
tier = 4
|
tier = 4
|
||||||
|
|
||||||
[balanced_rpcs.runonflux]
|
[balanced_rpcs.runonflux]
|
||||||
display_name = "Run on Flux (light)"
|
display_name = "Run on Flux (light)"
|
||||||
url = "https://ethereumnodelight.app.runonflux.io"
|
http_url = "https://ethereumnodelight.app.runonflux.io"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 5
|
tier = 5
|
||||||
|
|
||||||
@ -103,7 +103,7 @@ response_cache_max_bytes = 10_000_000_000
|
|||||||
[balanced_rpcs.linkpool-light]
|
[balanced_rpcs.linkpool-light]
|
||||||
display_name = "Linkpool (light)"
|
display_name = "Linkpool (light)"
|
||||||
disabled = true
|
disabled = true
|
||||||
url = "https://main-light.eth.linkpool.io"
|
http_url = "https://main-light.eth.linkpool.io"
|
||||||
soft_limit = 100
|
soft_limit = 100
|
||||||
tier = 5
|
tier = 5
|
||||||
|
|
||||||
@ -114,34 +114,34 @@ response_cache_max_bytes = 10_000_000_000
|
|||||||
[private_rpcs.eden]
|
[private_rpcs.eden]
|
||||||
disabled = true
|
disabled = true
|
||||||
display_name = "Eden network"
|
display_name = "Eden network"
|
||||||
url = "https://api.edennetwork.io/v1/"
|
http_url = "https://api.edennetwork.io/v1/"
|
||||||
soft_limit = 1_805
|
soft_limit = 1_805
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[private_rpcs.eden_beta]
|
[private_rpcs.eden_beta]
|
||||||
disabled = true
|
disabled = true
|
||||||
display_name = "Eden network beta"
|
display_name = "Eden network beta"
|
||||||
url = "https://api.edennetwork.io/v1/beta"
|
http_url = "https://api.edennetwork.io/v1/beta"
|
||||||
soft_limit = 5_861
|
soft_limit = 5_861
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[private_rpcs.ethermine]
|
[private_rpcs.ethermine]
|
||||||
disabled = true
|
disabled = true
|
||||||
display_name = "Ethermine"
|
display_name = "Ethermine"
|
||||||
url = "https://rpc.ethermine.org"
|
http_url = "https://rpc.ethermine.org"
|
||||||
soft_limit = 5_861
|
soft_limit = 5_861
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[private_rpcs.flashbots]
|
[private_rpcs.flashbots]
|
||||||
disabled = true
|
disabled = true
|
||||||
display_name = "Flashbots Fast"
|
display_name = "Flashbots Fast"
|
||||||
url = "https://rpc.flashbots.net/fast"
|
http_url = "https://rpc.flashbots.net/fast"
|
||||||
soft_limit = 7_074
|
soft_limit = 7_074
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[private_rpcs.securerpc]
|
[private_rpcs.securerpc]
|
||||||
disabled = true
|
disabled = true
|
||||||
display_name = "SecureRPC"
|
display_name = "SecureRPC"
|
||||||
url = "https://gibson.securerpc.com/v1"
|
http_url = "https://gibson.securerpc.com/v1"
|
||||||
soft_limit = 4_560
|
soft_limit = 4_560
|
||||||
tier = 0
|
tier = 0
|
||||||
|
@ -16,17 +16,26 @@ response_cache_max_bytes = 1_000_000_000
|
|||||||
|
|
||||||
[balanced_rpcs]
|
[balanced_rpcs]
|
||||||
|
|
||||||
[balanced_rpcs.llama_public_wss]
|
[balanced_rpcs.llama_public_both]
|
||||||
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
|
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
|
||||||
disabled = false
|
disabled = false
|
||||||
display_name = "LlamaNodes WSS"
|
display_name = "LlamaNodes Both"
|
||||||
url = "wss://eth.llamarpc.com/"
|
ws_url = "wss://eth.llamarpc.com/"
|
||||||
|
http_url = "https://eth.llamarpc.com/"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 0
|
tier = 0
|
||||||
|
|
||||||
[balanced_rpcs.llama_public_https]
|
[balanced_rpcs.llama_public_https]
|
||||||
disabled = false
|
disabled = false
|
||||||
display_name = "LlamaNodes HTTPS"
|
display_name = "LlamaNodes HTTPS"
|
||||||
url = "https://eth.llamarpc.com/"
|
http_url = "https://eth.llamarpc.com/"
|
||||||
|
soft_limit = 1_000
|
||||||
|
tier = 0
|
||||||
|
|
||||||
|
[balanced_rpcs.llama_public_wss]
|
||||||
|
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
|
||||||
|
disabled = false
|
||||||
|
display_name = "LlamaNodes WSS"
|
||||||
|
ws_url = "wss://eth.llamarpc.com/"
|
||||||
soft_limit = 1_000
|
soft_limit = 1_000
|
||||||
tier = 0
|
tier = 0
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "web3_proxy"
|
name = "web3_proxy"
|
||||||
version = "0.13.0"
|
version = "0.13.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
default-run = "web3_proxy_cli"
|
default-run = "web3_proxy_cli"
|
||||||
|
|
||||||
|
@ -482,7 +482,7 @@ impl Web3ProxyApp {
|
|||||||
let http_client = Some(
|
let http_client = Some(
|
||||||
reqwest::ClientBuilder::new()
|
reqwest::ClientBuilder::new()
|
||||||
.connect_timeout(Duration::from_secs(5))
|
.connect_timeout(Duration::from_secs(5))
|
||||||
.timeout(Duration::from_secs(60))
|
.timeout(Duration::from_secs(5 * 60))
|
||||||
.user_agent(APP_USER_AGENT)
|
.user_agent(APP_USER_AGENT)
|
||||||
.build()?,
|
.build()?,
|
||||||
);
|
);
|
||||||
@ -573,17 +573,17 @@ impl Web3ProxyApp {
|
|||||||
|
|
||||||
// connect to the load balanced rpcs
|
// connect to the load balanced rpcs
|
||||||
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
|
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
|
||||||
|
block_map.clone(),
|
||||||
top_config.app.chain_id,
|
top_config.app.chain_id,
|
||||||
db_conn.clone(),
|
db_conn.clone(),
|
||||||
balanced_rpcs,
|
|
||||||
http_client.clone(),
|
http_client.clone(),
|
||||||
vredis_pool.clone(),
|
|
||||||
block_map.clone(),
|
|
||||||
Some(watch_consensus_head_sender),
|
|
||||||
top_config.app.min_sum_soft_limit,
|
|
||||||
top_config.app.min_synced_rpcs,
|
top_config.app.min_synced_rpcs,
|
||||||
Some(pending_tx_sender.clone()),
|
top_config.app.min_sum_soft_limit,
|
||||||
pending_transactions.clone(),
|
pending_transactions.clone(),
|
||||||
|
Some(pending_tx_sender.clone()),
|
||||||
|
vredis_pool.clone(),
|
||||||
|
balanced_rpcs,
|
||||||
|
Some(watch_consensus_head_sender),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("spawning balanced rpcs")?;
|
.context("spawning balanced rpcs")?;
|
||||||
@ -599,21 +599,22 @@ impl Web3ProxyApp {
|
|||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let (private_rpcs, private_handle) = Web3Rpcs::spawn(
|
let (private_rpcs, private_handle) = Web3Rpcs::spawn(
|
||||||
|
block_map,
|
||||||
top_config.app.chain_id,
|
top_config.app.chain_id,
|
||||||
db_conn.clone(),
|
db_conn.clone(),
|
||||||
private_rpcs,
|
|
||||||
http_client.clone(),
|
http_client.clone(),
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
pending_transactions.clone(),
|
||||||
|
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
|
||||||
|
None,
|
||||||
vredis_pool.clone(),
|
vredis_pool.clone(),
|
||||||
block_map,
|
private_rpcs,
|
||||||
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
|
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
|
||||||
// they also often have low rate limits
|
// they also often have low rate limits
|
||||||
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
|
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
|
||||||
|
// TODO: but maybe we could include privates in the "backup" tier
|
||||||
None,
|
None,
|
||||||
0,
|
|
||||||
0,
|
|
||||||
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
|
|
||||||
None,
|
|
||||||
pending_transactions.clone(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("spawning private_rpcs")?;
|
.context("spawning private_rpcs")?;
|
||||||
@ -1035,9 +1036,17 @@ impl Web3ProxyApp {
|
|||||||
| "db_getString"
|
| "db_getString"
|
||||||
| "db_putHex"
|
| "db_putHex"
|
||||||
| "db_putString"
|
| "db_putString"
|
||||||
|
| "debug_accountRange"
|
||||||
|
| "debug_backtraceAt"
|
||||||
|
| "debug_blockProfile"
|
||||||
| "debug_chaindbCompact"
|
| "debug_chaindbCompact"
|
||||||
|
| "debug_chaindbProperty"
|
||||||
|
| "debug_cpuProfile"
|
||||||
|
| "debug_freeOSMemory"
|
||||||
| "debug_freezeClient"
|
| "debug_freezeClient"
|
||||||
|
| "debug_gcStats"
|
||||||
| "debug_goTrace"
|
| "debug_goTrace"
|
||||||
|
| "debug_memStats"
|
||||||
| "debug_mutexProfile"
|
| "debug_mutexProfile"
|
||||||
| "debug_setBlockProfileRate"
|
| "debug_setBlockProfileRate"
|
||||||
| "debug_setGCPercent"
|
| "debug_setGCPercent"
|
||||||
|
@ -205,31 +205,27 @@ mod tests {
|
|||||||
(
|
(
|
||||||
"anvil".to_string(),
|
"anvil".to_string(),
|
||||||
Web3RpcConfig {
|
Web3RpcConfig {
|
||||||
disabled: false,
|
http_url: Some(anvil.endpoint()),
|
||||||
display_name: None,
|
|
||||||
url: anvil.endpoint(),
|
|
||||||
backup: Some(false),
|
|
||||||
block_data_limit: None,
|
|
||||||
soft_limit: 100,
|
soft_limit: 100,
|
||||||
hard_limit: None,
|
|
||||||
tier: 0,
|
tier: 0,
|
||||||
subscribe_txs: Some(false),
|
..Default::default()
|
||||||
extra: Default::default(),
|
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"anvil_ws".to_string(),
|
"anvil_ws".to_string(),
|
||||||
Web3RpcConfig {
|
Web3RpcConfig {
|
||||||
disabled: false,
|
ws_url: Some(anvil.ws_endpoint()),
|
||||||
display_name: None,
|
|
||||||
url: anvil.ws_endpoint(),
|
|
||||||
backup: Some(false),
|
|
||||||
block_data_limit: None,
|
|
||||||
soft_limit: 100,
|
soft_limit: 100,
|
||||||
hard_limit: None,
|
|
||||||
tier: 0,
|
tier: 0,
|
||||||
subscribe_txs: Some(false),
|
..Default::default()
|
||||||
extra: Default::default(),
|
},
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"anvil_both".to_string(),
|
||||||
|
Web3RpcConfig {
|
||||||
|
http_url: Some(anvil.endpoint()),
|
||||||
|
ws_url: Some(anvil.ws_endpoint()),
|
||||||
|
..Default::default()
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
]),
|
]),
|
||||||
|
@ -132,6 +132,12 @@ pub async fn block_needed(
|
|||||||
head_block_num: U64,
|
head_block_num: U64,
|
||||||
rpcs: &Web3Rpcs,
|
rpcs: &Web3Rpcs,
|
||||||
) -> anyhow::Result<BlockNeeded> {
|
) -> anyhow::Result<BlockNeeded> {
|
||||||
|
// some requests have potentially very large responses
|
||||||
|
// TODO: only skip caching if the response actually is large
|
||||||
|
if method.starts_with("trace_") || method == "debug_traceTransaction" {
|
||||||
|
return Ok(BlockNeeded::CacheNever);
|
||||||
|
}
|
||||||
|
|
||||||
let params = if let Some(params) = params {
|
let params = if let Some(params) = params {
|
||||||
// grab the params so we can inspect and potentially modify them
|
// grab the params so we can inspect and potentially modify them
|
||||||
params
|
params
|
||||||
|
@ -197,15 +197,19 @@ fn default_response_cache_max_bytes() -> u64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Configuration for a backend web3 RPC server
|
/// Configuration for a backend web3 RPC server
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Default, Deserialize)]
|
||||||
pub struct Web3RpcConfig {
|
pub struct Web3RpcConfig {
|
||||||
/// simple way to disable a connection without deleting the row
|
/// simple way to disable a connection without deleting the row
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub disabled: bool,
|
pub disabled: bool,
|
||||||
/// a name used in /status and other user facing messages
|
/// a name used in /status and other user facing messages
|
||||||
pub display_name: Option<String>,
|
pub display_name: Option<String>,
|
||||||
/// websocket (or http if no websocket)
|
/// (deprecated) rpc url
|
||||||
pub url: String,
|
pub url: Option<String>,
|
||||||
|
/// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks
|
||||||
|
pub ws_url: Option<String>,
|
||||||
|
/// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON
|
||||||
|
pub http_url: Option<String>,
|
||||||
/// block data limit. If None, will be queried
|
/// block data limit. If None, will be queried
|
||||||
pub block_data_limit: Option<u64>,
|
pub block_data_limit: Option<u64>,
|
||||||
/// the requests per second at which the server starts slowing down
|
/// the requests per second at which the server starts slowing down
|
||||||
@ -213,14 +217,15 @@ pub struct Web3RpcConfig {
|
|||||||
/// the requests per second at which the server throws errors (rate limit or otherwise)
|
/// the requests per second at which the server throws errors (rate limit or otherwise)
|
||||||
pub hard_limit: Option<u64>,
|
pub hard_limit: Option<u64>,
|
||||||
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
|
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
|
||||||
pub backup: Option<bool>,
|
#[serde(default)]
|
||||||
|
pub backup: bool,
|
||||||
/// All else equal, a server with a lower tier receives all requests
|
/// All else equal, a server with a lower tier receives all requests
|
||||||
#[serde(default = "default_tier")]
|
#[serde(default = "default_tier")]
|
||||||
pub tier: u64,
|
pub tier: u64,
|
||||||
/// Subscribe to the firehose of pending transactions
|
/// Subscribe to the firehose of pending transactions
|
||||||
/// Don't do this with free rpcs
|
/// Don't do this with free rpcs
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub subscribe_txs: Option<bool>,
|
pub subscribe_txs: bool,
|
||||||
/// unknown config options get put here
|
/// unknown config options get put here
|
||||||
#[serde(flatten, default = "HashMap::default")]
|
#[serde(flatten, default = "HashMap::default")]
|
||||||
pub extra: HashMap<String, serde_json::Value>,
|
pub extra: HashMap<String, serde_json::Value>,
|
||||||
@ -245,47 +250,24 @@ impl Web3RpcConfig {
|
|||||||
block_map: BlockHashesCache,
|
block_map: BlockHashesCache,
|
||||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||||
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
|
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
|
||||||
|
reconnect: bool,
|
||||||
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
|
||||||
if !self.extra.is_empty() {
|
if !self.extra.is_empty() {
|
||||||
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
|
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
|
||||||
}
|
}
|
||||||
|
|
||||||
let hard_limit = match (self.hard_limit, redis_pool) {
|
|
||||||
(None, None) => None,
|
|
||||||
(Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)),
|
|
||||||
(None, Some(_)) => None,
|
|
||||||
(Some(_hard_limit), None) => {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"no redis client pool! needed for hard limit"
|
|
||||||
))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let tx_id_sender = if self.subscribe_txs.unwrap_or(false) {
|
|
||||||
tx_id_sender
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let backup = self.backup.unwrap_or(false);
|
|
||||||
|
|
||||||
Web3Rpc::spawn(
|
Web3Rpc::spawn(
|
||||||
|
self,
|
||||||
name,
|
name,
|
||||||
self.display_name,
|
|
||||||
chain_id,
|
chain_id,
|
||||||
db_conn,
|
db_conn,
|
||||||
self.url,
|
|
||||||
http_client,
|
http_client,
|
||||||
http_interval_sender,
|
http_interval_sender,
|
||||||
hard_limit,
|
redis_pool,
|
||||||
self.soft_limit,
|
|
||||||
backup,
|
|
||||||
self.block_data_limit,
|
|
||||||
block_map,
|
block_map,
|
||||||
block_sender,
|
block_sender,
|
||||||
tx_id_sender,
|
tx_id_sender,
|
||||||
true,
|
reconnect,
|
||||||
self.tier,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -149,12 +149,13 @@ impl Web3Rpcs {
|
|||||||
// TODO: if error, retry?
|
// TODO: if error, retry?
|
||||||
let block: ArcBlock = match rpc {
|
let block: ArcBlock = match rpc {
|
||||||
Some(rpc) => rpc
|
Some(rpc) => rpc
|
||||||
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), false)
|
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
|
||||||
.await?
|
.await?
|
||||||
.request::<_, Option<_>>(
|
.request::<_, Option<_>>(
|
||||||
"eth_getBlockByHash",
|
"eth_getBlockByHash",
|
||||||
&json!(get_block_params),
|
&json!(get_block_params),
|
||||||
Level::Error.into(),
|
Level::Error.into(),
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
.context("no block!")?,
|
.context("no block!")?,
|
||||||
|
0
web3_proxy/src/rpcs/grpc_erigon.rs
Normal file
0
web3_proxy/src/rpcs/grpc_erigon.rs
Normal file
0
web3_proxy/src/rpcs/http.rs
Normal file
0
web3_proxy/src/rpcs/http.rs
Normal file
@ -56,17 +56,17 @@ impl Web3Rpcs {
|
|||||||
/// Spawn durable connections to multiple Web3 providers.
|
/// Spawn durable connections to multiple Web3 providers.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn spawn(
|
pub async fn spawn(
|
||||||
|
block_map: BlockHashesCache,
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
db_conn: Option<DatabaseConnection>,
|
db_conn: Option<DatabaseConnection>,
|
||||||
server_configs: HashMap<String, Web3RpcConfig>,
|
|
||||||
http_client: Option<reqwest::Client>,
|
http_client: Option<reqwest::Client>,
|
||||||
redis_pool: Option<redis_rate_limiter::RedisPool>,
|
|
||||||
block_map: BlockHashesCache,
|
|
||||||
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
|
|
||||||
min_sum_soft_limit: u32,
|
|
||||||
min_head_rpcs: usize,
|
min_head_rpcs: usize,
|
||||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
min_sum_soft_limit: u32,
|
||||||
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
||||||
|
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||||
|
redis_pool: Option<redis_rate_limiter::RedisPool>,
|
||||||
|
server_configs: HashMap<String, Web3RpcConfig>,
|
||||||
|
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
|
||||||
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
||||||
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
|
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
|
||||||
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
|
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
|
||||||
@ -160,6 +160,7 @@ impl Web3Rpcs {
|
|||||||
block_map,
|
block_map,
|
||||||
block_sender,
|
block_sender,
|
||||||
pending_tx_id_sender,
|
pending_tx_id_sender,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
@ -343,7 +344,7 @@ impl Web3Rpcs {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|active_request_handle| async move {
|
.map(|active_request_handle| async move {
|
||||||
let result: Result<Box<RawValue>, _> = active_request_handle
|
let result: Result<Box<RawValue>, _> = active_request_handle
|
||||||
.request(method, &json!(¶ms), error_level.into())
|
.request(method, &json!(¶ms), error_level.into(), None)
|
||||||
.await;
|
.await;
|
||||||
result
|
result
|
||||||
})
|
})
|
||||||
@ -473,12 +474,20 @@ impl Web3Rpcs {
|
|||||||
for x in self
|
for x in self
|
||||||
.conns
|
.conns
|
||||||
.values()
|
.values()
|
||||||
.filter(|x| if allow_backups { true } else { !x.backup })
|
|
||||||
.filter(|x| !skip.contains(x))
|
|
||||||
.filter(|x| x.has_block_data(min_block_needed))
|
|
||||||
.filter(|x| {
|
.filter(|x| {
|
||||||
if let Some(max_block_needed) = max_block_needed {
|
if !allow_backups && x.backup {
|
||||||
x.has_block_data(max_block_needed)
|
false
|
||||||
|
} else if skip.contains(x) {
|
||||||
|
false
|
||||||
|
} else if !x.has_block_data(min_block_needed) {
|
||||||
|
false
|
||||||
|
} else if max_block_needed
|
||||||
|
.and_then(|max_block_needed| {
|
||||||
|
Some(!x.has_block_data(max_block_needed))
|
||||||
|
})
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
false
|
||||||
} else {
|
} else {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -521,58 +530,22 @@ impl Web3Rpcs {
|
|||||||
let mut earliest_retry_at = None;
|
let mut earliest_retry_at = None;
|
||||||
|
|
||||||
for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() {
|
for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() {
|
||||||
// under heavy load, it is possible for even our best server to be negative
|
|
||||||
let mut minimum = f64::MAX;
|
|
||||||
let mut maximum = f64::MIN;
|
|
||||||
|
|
||||||
// we sort on a combination of values. cache them here so that we don't do this math multiple times.
|
// we sort on a combination of values. cache them here so that we don't do this math multiple times.
|
||||||
let mut available_request_map: HashMap<_, f64> = usable_rpcs
|
// TODO: is this necessary if we use sort_by_cached_key?
|
||||||
|
let available_request_map: HashMap<_, f64> = usable_rpcs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|rpc| {
|
.map(|rpc| {
|
||||||
// TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that?
|
// TODO: weighted sort by remaining hard limit?
|
||||||
// TODO: get active requests out of redis (that's definitely too slow)
|
// TODO: weighted sort by soft_limit - ewma_active_requests? that assumes soft limits are any good
|
||||||
// TODO: do something with hard limit instead? (but that is hitting redis too much)
|
(rpc, 1.0)
|
||||||
let active_requests = rpc.active_requests() as f64;
|
|
||||||
let soft_limit = rpc.soft_limit as f64;
|
|
||||||
|
|
||||||
let available_requests = soft_limit - active_requests;
|
|
||||||
|
|
||||||
// trace!("available requests on {}: {}", rpc, available_requests);
|
|
||||||
|
|
||||||
minimum = minimum.min(available_requests);
|
|
||||||
maximum = maximum.max(available_requests);
|
|
||||||
|
|
||||||
(rpc, available_requests)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// trace!("minimum available requests: {}", minimum);
|
debug!("todo: better sort here");
|
||||||
// trace!("maximum available requests: {}", maximum);
|
|
||||||
|
|
||||||
if maximum < 0.0 {
|
|
||||||
// TODO: if maximum < 0 and there are other tiers on the same block, we should include them now
|
|
||||||
warn!("soft limits overloaded: {} to {}", minimum, maximum)
|
|
||||||
}
|
|
||||||
|
|
||||||
// choose_multiple_weighted can't have negative numbers. shift up if any are negative
|
|
||||||
// TODO: is this a correct way to shift?
|
|
||||||
if minimum < 0.0 {
|
|
||||||
available_request_map = available_request_map
|
|
||||||
.into_iter()
|
|
||||||
.map(|(rpc, available_requests)| {
|
|
||||||
// TODO: is simple addition the right way to shift everyone?
|
|
||||||
// TODO: probably want something non-linear
|
|
||||||
// minimum is negative, so we subtract to make available requests bigger
|
|
||||||
let x = available_requests - minimum;
|
|
||||||
|
|
||||||
(rpc, x)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
let sorted_rpcs = {
|
let sorted_rpcs = {
|
||||||
if usable_rpcs.len() == 1 {
|
if usable_rpcs.len() == 1 {
|
||||||
// TODO: return now instead? we shouldn't need another alloc
|
// TODO: try the next tier
|
||||||
vec![usable_rpcs.get(0).expect("there should be 1")]
|
vec![usable_rpcs.get(0).expect("there should be 1")]
|
||||||
} else {
|
} else {
|
||||||
let mut rng = thread_fast_rng::thread_fast_rng();
|
let mut rng = thread_fast_rng::thread_fast_rng();
|
||||||
@ -589,12 +562,10 @@ impl Web3Rpcs {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// now that the rpcs are sorted, try to get an active request handle for one of them
|
// now that the rpcs are sorted, try to get an active request handle for one of them
|
||||||
|
// TODO: pick two randomly and choose the one with the lower rpc.latency.ewma
|
||||||
for best_rpc in sorted_rpcs.into_iter() {
|
for best_rpc in sorted_rpcs.into_iter() {
|
||||||
// increment our connection counter
|
// increment our connection counter
|
||||||
match best_rpc
|
match best_rpc.try_request_handle(authorization, None).await {
|
||||||
.try_request_handle(authorization, min_block_needed.is_none())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(OpenRequestResult::Handle(handle)) => {
|
Ok(OpenRequestResult::Handle(handle)) => {
|
||||||
// trace!("opened handle: {}", best_rpc);
|
// trace!("opened handle: {}", best_rpc);
|
||||||
return Ok(OpenRequestResult::Handle(handle));
|
return Ok(OpenRequestResult::Handle(handle));
|
||||||
@ -741,10 +712,7 @@ impl Web3Rpcs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check rate limits and increment our connection counter
|
// check rate limits and increment our connection counter
|
||||||
match connection
|
match connection.try_request_handle(authorization, None).await {
|
||||||
.try_request_handle(authorization, min_block_needed.is_none())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
Ok(OpenRequestResult::RetryAt(retry_at)) => {
|
||||||
// this rpc is not available. skip it
|
// this rpc is not available. skip it
|
||||||
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
|
||||||
@ -827,6 +795,7 @@ impl Web3Rpcs {
|
|||||||
&request.method,
|
&request.method,
|
||||||
&json!(request.params),
|
&json!(request.params),
|
||||||
RequestRevertHandler::Save,
|
RequestRevertHandler::Save,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -1214,7 +1183,6 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::rpcs::{
|
use crate::rpcs::{
|
||||||
blockchain::{ConsensusFinder, SavedBlock},
|
blockchain::{ConsensusFinder, SavedBlock},
|
||||||
one::ProviderState,
|
|
||||||
provider::Web3Provider,
|
provider::Web3Provider,
|
||||||
};
|
};
|
||||||
use ethers::types::{Block, U256};
|
use ethers::types::{Block, U256};
|
||||||
@ -1338,9 +1306,6 @@ mod tests {
|
|||||||
|
|
||||||
let head_rpc = Web3Rpc {
|
let head_rpc = Web3Rpc {
|
||||||
name: "synced".to_string(),
|
name: "synced".to_string(),
|
||||||
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
|
|
||||||
Web3Provider::Mock,
|
|
||||||
))),
|
|
||||||
soft_limit: 1_000,
|
soft_limit: 1_000,
|
||||||
automatic_block_limit: false,
|
automatic_block_limit: false,
|
||||||
backup: false,
|
backup: false,
|
||||||
@ -1352,9 +1317,6 @@ mod tests {
|
|||||||
|
|
||||||
let lagged_rpc = Web3Rpc {
|
let lagged_rpc = Web3Rpc {
|
||||||
name: "lagged".to_string(),
|
name: "lagged".to_string(),
|
||||||
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
|
|
||||||
Web3Provider::Mock,
|
|
||||||
))),
|
|
||||||
soft_limit: 1_000,
|
soft_limit: 1_000,
|
||||||
automatic_block_limit: false,
|
automatic_block_limit: false,
|
||||||
backup: false,
|
backup: false,
|
||||||
@ -1553,9 +1515,6 @@ mod tests {
|
|||||||
|
|
||||||
let pruned_rpc = Web3Rpc {
|
let pruned_rpc = Web3Rpc {
|
||||||
name: "pruned".to_string(),
|
name: "pruned".to_string(),
|
||||||
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
|
|
||||||
Web3Provider::Mock,
|
|
||||||
))),
|
|
||||||
soft_limit: 3_000,
|
soft_limit: 3_000,
|
||||||
automatic_block_limit: false,
|
automatic_block_limit: false,
|
||||||
backup: false,
|
backup: false,
|
||||||
@ -1567,9 +1526,6 @@ mod tests {
|
|||||||
|
|
||||||
let archive_rpc = Web3Rpc {
|
let archive_rpc = Web3Rpc {
|
||||||
name: "archive".to_string(),
|
name: "archive".to_string(),
|
||||||
provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new(
|
|
||||||
Web3Provider::Mock,
|
|
||||||
))),
|
|
||||||
soft_limit: 1_000,
|
soft_limit: 1_000,
|
||||||
automatic_block_limit: false,
|
automatic_block_limit: false,
|
||||||
backup: false,
|
backup: false,
|
||||||
|
@ -3,9 +3,9 @@ use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
|
|||||||
use super::provider::Web3Provider;
|
use super::provider::Web3Provider;
|
||||||
use super::request::{OpenRequestHandle, OpenRequestResult};
|
use super::request::{OpenRequestHandle, OpenRequestResult};
|
||||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||||
use crate::config::BlockAndRpc;
|
use crate::config::{BlockAndRpc, Web3RpcConfig};
|
||||||
use crate::frontend::authorization::Authorization;
|
use crate::frontend::authorization::Authorization;
|
||||||
use anyhow::Context;
|
use anyhow::{anyhow, Context};
|
||||||
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||||
use ethers::types::U256;
|
use ethers::types::U256;
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
@ -21,50 +21,13 @@ use serde_json::json;
|
|||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
use std::sync::atomic::{self, AtomicU64};
|
||||||
use std::{cmp::Ordering, sync::Arc};
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
use thread_fast_rng::rand::Rng;
|
use thread_fast_rng::rand::Rng;
|
||||||
use thread_fast_rng::thread_fast_rng;
|
use thread_fast_rng::thread_fast_rng;
|
||||||
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
|
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
|
||||||
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
||||||
|
|
||||||
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub enum ProviderState {
|
|
||||||
None,
|
|
||||||
Connecting(Arc<Web3Provider>),
|
|
||||||
Connected(Arc<Web3Provider>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ProviderState {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ProviderState {
|
|
||||||
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
|
|
||||||
match self {
|
|
||||||
ProviderState::None => None,
|
|
||||||
ProviderState::Connecting(x) => {
|
|
||||||
if allow_not_ready {
|
|
||||||
Some(x)
|
|
||||||
} else {
|
|
||||||
// TODO: do a ready check here?
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ProviderState::Connected(x) => {
|
|
||||||
if x.ready() {
|
|
||||||
Some(x)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Web3RpcLatencies {
|
pub struct Web3RpcLatencies {
|
||||||
/// Traack how far behind the fastest node we are
|
/// Traack how far behind the fastest node we are
|
||||||
pub new_head: Histogram<u64>,
|
pub new_head: Histogram<u64>,
|
||||||
@ -93,19 +56,15 @@ pub struct Web3Rpc {
|
|||||||
pub name: String,
|
pub name: String,
|
||||||
pub display_name: Option<String>,
|
pub display_name: Option<String>,
|
||||||
pub db_conn: Option<DatabaseConnection>,
|
pub db_conn: Option<DatabaseConnection>,
|
||||||
/// TODO: can we get this from the provider? do we even need it?
|
pub(super) ws_url: Option<String>,
|
||||||
pub(super) url: String,
|
pub(super) http_url: Option<String>,
|
||||||
/// Some connections use an http_client. we keep a clone for reconnecting
|
/// Some connections use an http_client. we keep a clone for reconnecting
|
||||||
pub(super) http_client: Option<reqwest::Client>,
|
pub(super) http_client: Option<reqwest::Client>,
|
||||||
/// keep track of currently open requests. We sort on this
|
|
||||||
pub(super) active_requests: AtomicU32,
|
|
||||||
/// keep track of total requests from the frontend
|
|
||||||
pub(super) frontend_requests: AtomicU64,
|
|
||||||
/// keep track of total requests from web3-proxy itself
|
|
||||||
pub(super) internal_requests: AtomicU64,
|
|
||||||
/// provider is in a RwLock so that we can replace it if re-connecting
|
/// provider is in a RwLock so that we can replace it if re-connecting
|
||||||
/// it is an async lock because we hold it open across awaits
|
/// it is an async lock because we hold it open across awaits
|
||||||
pub(super) provider_state: AsyncRwLock<ProviderState>,
|
/// this provider is only used for new heads subscriptions
|
||||||
|
/// TODO: put the provider inside an arc?
|
||||||
|
pub(super) new_head_client: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
||||||
/// keep track of hard limits
|
/// keep track of hard limits
|
||||||
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
||||||
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
||||||
@ -121,7 +80,7 @@ pub struct Web3Rpc {
|
|||||||
pub(super) block_data_limit: AtomicU64,
|
pub(super) block_data_limit: AtomicU64,
|
||||||
/// Lower tiers are higher priority when sending requests
|
/// Lower tiers are higher priority when sending requests
|
||||||
pub(super) tier: u64,
|
pub(super) tier: u64,
|
||||||
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change
|
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
|
||||||
pub(super) head_block: RwLock<Option<SavedBlock>>,
|
pub(super) head_block: RwLock<Option<SavedBlock>>,
|
||||||
/// Track how fast this RPC is
|
/// Track how fast this RPC is
|
||||||
pub(super) latency: Web3RpcLatencies,
|
pub(super) latency: Web3RpcLatencies,
|
||||||
@ -132,39 +91,54 @@ impl Web3Rpc {
|
|||||||
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
|
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn spawn(
|
pub async fn spawn(
|
||||||
|
mut config: Web3RpcConfig,
|
||||||
name: String,
|
name: String,
|
||||||
display_name: Option<String>,
|
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
db_conn: Option<DatabaseConnection>,
|
db_conn: Option<DatabaseConnection>,
|
||||||
url_str: String,
|
|
||||||
// optional because this is only used for http providers. websocket providers don't use it
|
// optional because this is only used for http providers. websocket providers don't use it
|
||||||
http_client: Option<reqwest::Client>,
|
http_client: Option<reqwest::Client>,
|
||||||
|
// TODO: rename to http_new_head_interval_sender?
|
||||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||||
// TODO: have a builder struct for this.
|
redis_pool: Option<RedisPool>,
|
||||||
hard_limit: Option<(u64, RedisPool)>,
|
// TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough
|
||||||
// TODO: think more about this type
|
// soft_limit: u32,
|
||||||
soft_limit: u32,
|
|
||||||
backup: bool,
|
|
||||||
block_data_limit: Option<u64>,
|
|
||||||
block_map: BlockHashesCache,
|
block_map: BlockHashesCache,
|
||||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||||
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
||||||
reconnect: bool,
|
reconnect: bool,
|
||||||
tier: u64,
|
|
||||||
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
|
||||||
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
|
let hard_limit = match (config.hard_limit, redis_pool) {
|
||||||
// TODO: is cache size 1 okay? i think we need
|
(None, None) => None,
|
||||||
RedisRateLimiter::new(
|
(Some(hard_limit), Some(redis_pool)) => {
|
||||||
|
// TODO: in process rate limiter instead? or is deffered good enough?
|
||||||
|
let rrl = RedisRateLimiter::new(
|
||||||
"web3_proxy",
|
"web3_proxy",
|
||||||
&format!("{}:{}", chain_id, name),
|
&format!("{}:{}", chain_id, name),
|
||||||
hard_rate_limit,
|
hard_limit,
|
||||||
60.0,
|
60.0,
|
||||||
redis_pool,
|
redis_pool,
|
||||||
)
|
);
|
||||||
});
|
|
||||||
|
|
||||||
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
|
Some(rrl)
|
||||||
let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into();
|
}
|
||||||
|
(None, Some(_)) => None,
|
||||||
|
(Some(_hard_limit), None) => {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"no redis client pool! needed for hard limit"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let tx_id_sender = if config.subscribe_txs {
|
||||||
|
// TODO: warn if tx_id_sender is None?
|
||||||
|
tx_id_sender
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let backup = config.backup;
|
||||||
|
|
||||||
|
let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into();
|
||||||
let automatic_block_limit =
|
let automatic_block_limit =
|
||||||
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
|
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
|
||||||
|
|
||||||
@ -178,19 +152,36 @@ impl Web3Rpc {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if config.ws_url.is_none() && config.http_url.is_none() {
|
||||||
|
if let Some(url) = config.url {
|
||||||
|
if url.starts_with("ws") {
|
||||||
|
config.ws_url = Some(url);
|
||||||
|
} else if url.starts_with("http") {
|
||||||
|
config.http_url = Some(url);
|
||||||
|
} else {
|
||||||
|
return Err(anyhow!("only ws or http urls are supported"));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"either ws_url or http_url are required. it is best to set both"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let new_connection = Self {
|
let new_connection = Self {
|
||||||
name,
|
name,
|
||||||
db_conn: db_conn.clone(),
|
db_conn: db_conn.clone(),
|
||||||
display_name,
|
display_name: config.display_name,
|
||||||
http_client,
|
http_client,
|
||||||
url: url_str,
|
ws_url: config.ws_url,
|
||||||
|
http_url: config.http_url,
|
||||||
hard_limit,
|
hard_limit,
|
||||||
hard_limit_until,
|
hard_limit_until,
|
||||||
soft_limit,
|
soft_limit: config.soft_limit,
|
||||||
automatic_block_limit,
|
automatic_block_limit,
|
||||||
backup,
|
backup,
|
||||||
block_data_limit,
|
block_data_limit,
|
||||||
tier,
|
tier: config.tier,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -224,6 +215,7 @@ impl Web3Rpc {
|
|||||||
async fn check_block_data_limit(
|
async fn check_block_data_limit(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
authorization: &Arc<Authorization>,
|
authorization: &Arc<Authorization>,
|
||||||
|
unlocked_provider: Option<Arc<Web3Provider>>,
|
||||||
) -> anyhow::Result<Option<u64>> {
|
) -> anyhow::Result<Option<u64>> {
|
||||||
if !self.automatic_block_limit {
|
if !self.automatic_block_limit {
|
||||||
// TODO: is this a good thing to return?
|
// TODO: is this a good thing to return?
|
||||||
@ -238,7 +230,7 @@ impl Web3Rpc {
|
|||||||
// TODO: start at 0 or 1?
|
// TODO: start at 0 or 1?
|
||||||
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
|
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
|
||||||
let handle = self
|
let handle = self
|
||||||
.wait_for_request_handle(authorization, None, true)
|
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let head_block_num_future = handle.request::<Option<()>, U256>(
|
let head_block_num_future = handle.request::<Option<()>, U256>(
|
||||||
@ -246,6 +238,7 @@ impl Web3Rpc {
|
|||||||
&None,
|
&None,
|
||||||
// error here are expected, so keep the level low
|
// error here are expected, so keep the level low
|
||||||
Level::Debug.into(),
|
Level::Debug.into(),
|
||||||
|
unlocked_provider.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
|
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
|
||||||
@ -264,7 +257,7 @@ impl Web3Rpc {
|
|||||||
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
|
||||||
// TODO: what should the request be?
|
// TODO: what should the request be?
|
||||||
let handle = self
|
let handle = self
|
||||||
.wait_for_request_handle(authorization, None, true)
|
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let archive_result: Result<Bytes, _> = handle
|
let archive_result: Result<Bytes, _> = handle
|
||||||
@ -276,6 +269,7 @@ impl Web3Rpc {
|
|||||||
)),
|
)),
|
||||||
// error here are expected, so keep the level low
|
// error here are expected, so keep the level low
|
||||||
Level::Trace.into(),
|
Level::Trace.into(),
|
||||||
|
unlocked_provider.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -403,56 +397,60 @@ impl Web3Rpc {
|
|||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
db_conn: Option<&DatabaseConnection>,
|
db_conn: Option<&DatabaseConnection>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// trace!("provider_state {} locking...", self);
|
if let Ok(mut unlocked_provider) = self.new_head_client.try_write() {
|
||||||
let mut provider_state = self
|
#[cfg(test)]
|
||||||
.provider_state
|
if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() {
|
||||||
.try_write()
|
|
||||||
.context("locking provider for write")?;
|
|
||||||
// trace!("provider_state {} locked: {:?}", self, provider_state);
|
|
||||||
|
|
||||||
match &*provider_state {
|
|
||||||
ProviderState::None => {
|
|
||||||
info!("connecting to {}", self);
|
|
||||||
}
|
|
||||||
ProviderState::Connecting(provider) | ProviderState::Connected(provider) => {
|
|
||||||
// disconnect the current provider
|
|
||||||
if let Web3Provider::Mock = provider.as_ref() {
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() {
|
||||||
|
// set up ws client
|
||||||
|
match &*unlocked_provider {
|
||||||
|
None => {
|
||||||
|
info!("connecting to {}", self);
|
||||||
|
}
|
||||||
|
Some(_) => {
|
||||||
debug!("reconnecting to {}", self);
|
debug!("reconnecting to {}", self);
|
||||||
|
|
||||||
// disconnect the current provider
|
// tell the block subscriber that this rpc doesn't have any blocks
|
||||||
*provider_state = ProviderState::None;
|
|
||||||
|
|
||||||
// reset sync status
|
|
||||||
// trace!("locking head block on {}", self);
|
|
||||||
{
|
|
||||||
let mut head_block = self.head_block.write();
|
|
||||||
*head_block = None;
|
|
||||||
}
|
|
||||||
// trace!("done with head block on {}", self);
|
|
||||||
|
|
||||||
// tell the block subscriber that we don't have any blocks
|
|
||||||
if let Some(block_sender) = block_sender {
|
if let Some(block_sender) = block_sender {
|
||||||
block_sender
|
block_sender
|
||||||
.send_async((None, self.clone()))
|
.send_async((None, self.clone()))
|
||||||
.await
|
.await
|
||||||
.context("block_sender during connect")?;
|
.context("block_sender during connect")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset sync status
|
||||||
|
let mut head_block = self.head_block.write();
|
||||||
|
*head_block = None;
|
||||||
|
|
||||||
|
// disconnect the current provider
|
||||||
|
// TODO: what until the block_sender's receiver finishes updating this item?
|
||||||
|
*unlocked_provider = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// trace!("Creating new Web3Provider on {}", self);
|
let p = Web3Provider::from_str(ws_url.as_str(), None)
|
||||||
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
.await
|
||||||
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
|
.context(format!("failed connecting to {}", ws_url))?;
|
||||||
|
|
||||||
// trace!("saving provider state as NotReady on {}", self);
|
assert!(p.ws().is_some());
|
||||||
*provider_state = ProviderState::Connecting(Arc::new(new_provider));
|
|
||||||
|
|
||||||
// drop the lock so that we can get a request handle
|
Some(Arc::new(p))
|
||||||
// trace!("provider_state {} unlocked", self);
|
} else {
|
||||||
drop(provider_state);
|
// http client
|
||||||
|
if let Some(url) = &self.http_url {
|
||||||
|
let p = Web3Provider::from_str(url, self.http_client.clone())
|
||||||
|
.await
|
||||||
|
.context(format!("failed connecting to {}", url))?;
|
||||||
|
|
||||||
|
assert!(p.http().is_some());
|
||||||
|
|
||||||
|
Some(Arc::new(p))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
|
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
|
||||||
|
|
||||||
@ -461,12 +459,13 @@ impl Web3Rpc {
|
|||||||
// TODO: what should the timeout be? should there be a request timeout?
|
// TODO: what should the timeout be? should there be a request timeout?
|
||||||
// trace!("waiting on chain id for {}", self);
|
// trace!("waiting on chain id for {}", self);
|
||||||
let found_chain_id: Result<U64, _> = self
|
let found_chain_id: Result<U64, _> = self
|
||||||
.wait_for_request_handle(&authorization, None, true)
|
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
||||||
.await?
|
.await?
|
||||||
.request(
|
.request(
|
||||||
"eth_chainId",
|
"eth_chainId",
|
||||||
&json!(Option::None::<()>),
|
&json!(Option::None::<()>),
|
||||||
Level::Trace.into(),
|
Level::Trace.into(),
|
||||||
|
unlocked_provider.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
// trace!("found_chain_id: {:?}", found_chain_id);
|
// trace!("found_chain_id: {:?}", found_chain_id);
|
||||||
@ -488,34 +487,21 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.check_block_data_limit(&authorization).await?;
|
self.check_block_data_limit(&authorization, unlocked_provider.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
{
|
drop(unlocked_provider);
|
||||||
// trace!("locking for ready...");
|
|
||||||
let mut provider_state = self.provider_state.write().await;
|
|
||||||
// trace!("locked for ready...");
|
|
||||||
|
|
||||||
// TODO: do this without a clone
|
|
||||||
let ready_provider = provider_state
|
|
||||||
.provider(true)
|
|
||||||
.await
|
|
||||||
.context("provider missing")?
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
*provider_state = ProviderState::Connected(ready_provider);
|
|
||||||
// trace!("unlocked for ready...");
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("successfully connected to {}", self);
|
info!("successfully connected to {}", self);
|
||||||
|
} else {
|
||||||
|
if self.new_head_client.read().await.is_none() {
|
||||||
|
return Err(anyhow!("failed waiting for client"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn active_requests(&self) -> u32 {
|
|
||||||
self.active_requests.load(atomic::Ordering::Acquire)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_head_block_result(
|
async fn send_head_block_result(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
new_head_block: Result<Option<ArcBlock>, ProviderError>,
|
new_head_block: Result<Option<ArcBlock>, ProviderError>,
|
||||||
@ -558,7 +544,7 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
if self.block_data_limit() == U64::zero() {
|
if self.block_data_limit() == U64::zero() {
|
||||||
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
|
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
|
||||||
if let Err(err) = self.check_block_data_limit(&authorization).await {
|
if let Err(err) = self.check_block_data_limit(&authorization, None).await {
|
||||||
warn!(
|
warn!(
|
||||||
"failed checking block limit after {} finished syncing. {:?}",
|
"failed checking block limit after {} finished syncing. {:?}",
|
||||||
self, err
|
self, err
|
||||||
@ -629,29 +615,21 @@ impl Web3Rpc {
|
|||||||
// provider is ready
|
// provider is ready
|
||||||
ready_tx.send(()).unwrap();
|
ready_tx.send(()).unwrap();
|
||||||
|
|
||||||
// wait before doing the initial health check
|
|
||||||
// TODO: how often?
|
// TODO: how often?
|
||||||
// TODO: subscribe to self.head_block
|
// TODO: reset this timeout every time a new block is seen
|
||||||
let health_sleep_seconds = 10;
|
let health_sleep_seconds = 10;
|
||||||
|
|
||||||
|
// wait before doing the initial health check
|
||||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// TODO: what if we just happened to have this check line up with another restart?
|
// TODO: what if we just happened to have this check line up with another restart?
|
||||||
// TODO: think more about this
|
// TODO: think more about this
|
||||||
// trace!("health check on {}. locking...", conn);
|
if let Some(client) = &*conn.new_head_client.read().await {
|
||||||
if conn
|
|
||||||
.provider_state
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.provider(false)
|
|
||||||
.await
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
// trace!("health check unlocked with error on {}", conn);
|
// trace!("health check unlocked with error on {}", conn);
|
||||||
// returning error will trigger a reconnect
|
// returning error will trigger a reconnect
|
||||||
return Err(anyhow::anyhow!("{} is not ready", conn));
|
// TODO: do a query of some kind
|
||||||
}
|
}
|
||||||
// trace!("health check on {}. unlocked", conn);
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||||
}
|
}
|
||||||
@ -712,7 +690,7 @@ impl Web3Rpc {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
|
/// Subscribe to new blocks.
|
||||||
async fn subscribe_new_heads(
|
async fn subscribe_new_heads(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
authorization: Arc<Authorization>,
|
authorization: Arc<Authorization>,
|
||||||
@ -722,19 +700,10 @@ impl Web3Rpc {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
trace!("watching new heads on {}", self);
|
trace!("watching new heads on {}", self);
|
||||||
|
|
||||||
// trace!("locking on new heads");
|
let unlocked_provider = self.new_head_client.read().await;
|
||||||
let provider_state = self
|
|
||||||
.provider_state
|
|
||||||
.try_read()
|
|
||||||
.context("subscribe_new_heads")?
|
|
||||||
.clone();
|
|
||||||
// trace!("unlocked on new heads");
|
|
||||||
|
|
||||||
// TODO: need a timeout
|
match unlocked_provider.as_deref() {
|
||||||
if let ProviderState::Connected(provider) = provider_state {
|
Some(Web3Provider::Http(_client)) => {
|
||||||
match provider.as_ref() {
|
|
||||||
Web3Provider::Mock => unimplemented!(),
|
|
||||||
Web3Provider::Http(_provider) => {
|
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
// TODO: try watch_blocks and fall back to this?
|
// TODO: try watch_blocks and fall back to this?
|
||||||
|
|
||||||
@ -745,7 +714,7 @@ impl Web3Rpc {
|
|||||||
loop {
|
loop {
|
||||||
// TODO: what should the max_wait be?
|
// TODO: what should the max_wait be?
|
||||||
match self
|
match self
|
||||||
.wait_for_request_handle(&authorization, None, false)
|
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(active_request_handle) => {
|
Ok(active_request_handle) => {
|
||||||
@ -754,6 +723,7 @@ impl Web3Rpc {
|
|||||||
"eth_getBlockByNumber",
|
"eth_getBlockByNumber",
|
||||||
&json!(("latest", false)),
|
&json!(("latest", false)),
|
||||||
Level::Warn.into(),
|
Level::Warn.into(),
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -770,9 +740,8 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
Ok(Some(block)) => {
|
Ok(Some(block)) => {
|
||||||
// don't send repeat blocks
|
// don't send repeat blocks
|
||||||
let new_hash = block
|
let new_hash =
|
||||||
.hash
|
block.hash.expect("blocks here should always have hashes");
|
||||||
.expect("blocks here should always have hashes");
|
|
||||||
|
|
||||||
if new_hash != last_hash {
|
if new_hash != last_hash {
|
||||||
// new hash!
|
// new hash!
|
||||||
@ -800,11 +769,7 @@ impl Web3Rpc {
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Internal error on latest block from {}. {:?}", self, err);
|
warn!("Internal error on latest block from {}. {:?}", self, err);
|
||||||
|
|
||||||
self.send_head_block_result(
|
self.send_head_block_result(Ok(None), &block_sender, block_map.clone())
|
||||||
Ok(None),
|
|
||||||
&block_sender,
|
|
||||||
block_map.clone(),
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: what should we do? sleep? extra time?
|
// TODO: what should we do? sleep? extra time?
|
||||||
@ -828,25 +793,27 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Web3Provider::Ws(provider) => {
|
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
|
||||||
// todo: move subscribe_blocks onto the request handle?
|
// todo: move subscribe_blocks onto the request handle?
|
||||||
let active_request_handle = self
|
let active_request_handle = self
|
||||||
.wait_for_request_handle(&authorization, None, false)
|
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
||||||
.await;
|
.await;
|
||||||
let mut stream = provider.subscribe_blocks().await?;
|
let mut stream = client.subscribe_blocks().await?;
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
|
|
||||||
// query the block once since the subscription doesn't send the current block
|
// query the block once since the subscription doesn't send the current block
|
||||||
// there is a very small race condition here where the stream could send us a new block right now
|
// there is a very small race condition here where the stream could send us a new block right now
|
||||||
// all it does is print "new block" for the same block as current block
|
// but all that does is print "new block" for the same block as current block
|
||||||
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
||||||
|
// TODO: do this part over http?
|
||||||
let block: Result<Option<ArcBlock>, _> = self
|
let block: Result<Option<ArcBlock>, _> = self
|
||||||
.wait_for_request_handle(&authorization, None, false)
|
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
||||||
.await?
|
.await?
|
||||||
.request(
|
.request(
|
||||||
"eth_getBlockByNumber",
|
"eth_getBlockByNumber",
|
||||||
&json!(("latest", false)),
|
&json!(("latest", false)),
|
||||||
Level::Warn.into(),
|
Level::Warn.into(),
|
||||||
|
unlocked_provider.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -890,42 +857,42 @@ impl Web3Rpc {
|
|||||||
warn!("new_heads subscription to {} ended", self);
|
warn!("new_heads subscription to {} ended", self);
|
||||||
Err(anyhow::anyhow!("new_heads subscription ended"))
|
Err(anyhow::anyhow!("new_heads subscription ended"))
|
||||||
}
|
}
|
||||||
}
|
None => todo!("what should happen now? wait for a connection?"),
|
||||||
} else {
|
#[cfg(test)]
|
||||||
Err(anyhow::anyhow!(
|
Some(Web3Provider::Mock) => unimplemented!(),
|
||||||
"Provider not ready! Unable to subscribe to heads"
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Turn on the firehose of pending transactions
|
||||||
async fn subscribe_pending_transactions(
|
async fn subscribe_pending_transactions(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
authorization: Arc<Authorization>,
|
authorization: Arc<Authorization>,
|
||||||
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
|
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
if let ProviderState::Connected(provider) = self
|
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
|
||||||
.provider_state
|
// TODO: timeout
|
||||||
.try_read()
|
let provider = self.new_head_client.read().await;
|
||||||
.context("subscribe_pending_transactions")?
|
|
||||||
.clone()
|
|
||||||
{
|
|
||||||
trace!("watching pending transactions on {}", self);
|
trace!("watching pending transactions on {}", self);
|
||||||
// TODO: does this keep the lock open for too long?
|
// TODO: does this keep the lock open for too long?
|
||||||
match provider.as_ref() {
|
match provider.as_deref() {
|
||||||
Web3Provider::Mock => unimplemented!(),
|
None => {
|
||||||
Web3Provider::Http(provider) => {
|
// TODO: wait for a provider
|
||||||
|
return Err(anyhow!("no provider"));
|
||||||
|
}
|
||||||
|
Some(Web3Provider::Http(provider)) => {
|
||||||
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
// TODO: maybe subscribe to self.head_block?
|
// TODO: maybe subscribe to self.head_block?
|
||||||
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
|
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
|
||||||
futures::future::pending::<()>().await;
|
futures::future::pending::<()>().await;
|
||||||
}
|
}
|
||||||
Web3Provider::Ws(provider) => {
|
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
|
||||||
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
||||||
let active_request_handle = self
|
let active_request_handle = self
|
||||||
.wait_for_request_handle(&authorization, None, false)
|
.wait_for_request_handle(&authorization, None, provider.clone())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut stream = provider.subscribe_pending_txs().await?;
|
let mut stream = client.subscribe_pending_txs().await?;
|
||||||
|
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
|
|
||||||
@ -943,12 +910,8 @@ impl Web3Rpc {
|
|||||||
warn!("pending_transactions subscription ended on {}", self);
|
warn!("pending_transactions subscription ended on {}", self);
|
||||||
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
|
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
|
||||||
}
|
}
|
||||||
}
|
#[cfg(test)]
|
||||||
} else {
|
Some(Web3Provider::Mock) => futures::future::pending::<()>().await,
|
||||||
warn!(
|
|
||||||
"Provider not ready! Unable to watch pending transactions on {}",
|
|
||||||
self
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -957,17 +920,17 @@ impl Web3Rpc {
|
|||||||
/// be careful with this; it might wait forever!
|
/// be careful with this; it might wait forever!
|
||||||
/// `allow_not_ready` is only for use by health checks while starting the provider
|
/// `allow_not_ready` is only for use by health checks while starting the provider
|
||||||
/// TODO: don't use anyhow. use specific error type
|
/// TODO: don't use anyhow. use specific error type
|
||||||
pub async fn wait_for_request_handle(
|
pub async fn wait_for_request_handle<'a>(
|
||||||
self: &Arc<Self>,
|
self: &'a Arc<Self>,
|
||||||
authorization: &Arc<Authorization>,
|
authorization: &'a Arc<Authorization>,
|
||||||
max_wait: Option<Duration>,
|
max_wait: Option<Duration>,
|
||||||
allow_not_ready: bool,
|
unlocked_provider: Option<Arc<Web3Provider>>,
|
||||||
) -> anyhow::Result<OpenRequestHandle> {
|
) -> anyhow::Result<OpenRequestHandle> {
|
||||||
let max_wait = max_wait.map(|x| Instant::now() + x);
|
let max_wait = max_wait.map(|x| Instant::now() + x);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self
|
match self
|
||||||
.try_request_handle(authorization, allow_not_ready)
|
.try_request_handle(authorization, unlocked_provider.clone())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
|
||||||
@ -1015,20 +978,14 @@ impl Web3Rpc {
|
|||||||
pub async fn try_request_handle(
|
pub async fn try_request_handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
authorization: &Arc<Authorization>,
|
authorization: &Arc<Authorization>,
|
||||||
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
|
// TODO: borrow on this instead of needing to clone the Arc?
|
||||||
allow_not_ready: bool,
|
unlocked_provider: Option<Arc<Web3Provider>>,
|
||||||
) -> anyhow::Result<OpenRequestResult> {
|
) -> anyhow::Result<OpenRequestResult> {
|
||||||
// TODO: think more about this read block
|
// TODO: think more about this read block
|
||||||
if !allow_not_ready
|
// TODO: this should *not* be new_head_client. this should be a separate object
|
||||||
&& self
|
if unlocked_provider.is_some() || self.new_head_client.read().await.is_some() {
|
||||||
.provider_state
|
// we already have an unlocked provider. no need to lock
|
||||||
.read()
|
} else {
|
||||||
.await
|
|
||||||
.provider(allow_not_ready)
|
|
||||||
.await
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
trace!("{} is not ready", self);
|
|
||||||
return Ok(OpenRequestResult::NotReady(self.backup));
|
return Ok(OpenRequestResult::NotReady(self.backup));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1144,15 +1101,11 @@ impl Serialize for Web3Rpc {
|
|||||||
|
|
||||||
state.serialize_field("soft_limit", &self.soft_limit)?;
|
state.serialize_field("soft_limit", &self.soft_limit)?;
|
||||||
|
|
||||||
state.serialize_field(
|
// TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana?
|
||||||
"active_requests",
|
// state.serialize_field(
|
||||||
&self.active_requests.load(atomic::Ordering::Relaxed),
|
// "frontend_requests",
|
||||||
)?;
|
// &self.frontend_requests.load(atomic::Ordering::Relaxed),
|
||||||
|
// )?;
|
||||||
state.serialize_field(
|
|
||||||
"total_requests",
|
|
||||||
&self.frontend_requests.load(atomic::Ordering::Relaxed),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// TODO: maybe this is too much data. serialize less?
|
// TODO: maybe this is too much data. serialize less?
|
||||||
@ -1216,7 +1169,7 @@ mod tests {
|
|||||||
|
|
||||||
let x = Web3Rpc {
|
let x = Web3Rpc {
|
||||||
name: "name".to_string(),
|
name: "name".to_string(),
|
||||||
url: "ws://example.com".to_string(),
|
ws_url: Some("ws://example.com".to_string()),
|
||||||
soft_limit: 1_000,
|
soft_limit: 1_000,
|
||||||
automatic_block_limit: false,
|
automatic_block_limit: false,
|
||||||
backup: false,
|
backup: false,
|
||||||
|
@ -2,22 +2,45 @@ use anyhow::Context;
|
|||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
// TODO: our own structs for these that handle streaming large responses
|
||||||
|
type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
|
||||||
|
type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
|
||||||
|
|
||||||
/// Use HTTP and WS providers.
|
/// Use HTTP and WS providers.
|
||||||
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
|
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
|
||||||
|
// TODO: custom types that let us stream JSON responses
|
||||||
#[derive(From)]
|
#[derive(From)]
|
||||||
pub enum Web3Provider {
|
pub enum Web3Provider {
|
||||||
Http(ethers::providers::Provider<ethers::providers::Http>),
|
Both(EthersHttpProvider, EthersWsProvider),
|
||||||
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
Http(EthersHttpProvider),
|
||||||
// TODO: only include this for tests.
|
// TODO: deadpool? custom tokio-tungstenite
|
||||||
|
Ws(EthersWsProvider),
|
||||||
|
#[cfg(test)]
|
||||||
Mock,
|
Mock,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Provider {
|
impl Web3Provider {
|
||||||
pub fn ready(&self) -> bool {
|
pub fn ready(&self) -> bool {
|
||||||
match self {
|
match self {
|
||||||
Self::Mock => true,
|
Self::Both(_, ws) => ws.as_ref().ready(),
|
||||||
Self::Http(_) => true,
|
Self::Http(_) => true,
|
||||||
Self::Ws(provider) => provider.as_ref().ready(),
|
Self::Ws(ws) => ws.as_ref().ready(),
|
||||||
|
#[cfg(test)]
|
||||||
|
Self::Mock => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn http(&self) -> Option<&EthersHttpProvider> {
|
||||||
|
match self {
|
||||||
|
Self::Http(x) => Some(x),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ws(&self) -> Option<&EthersWsProvider> {
|
||||||
|
match self {
|
||||||
|
Self::Both(_, x) | Self::Ws(x) => Some(x),
|
||||||
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use super::one::Web3Rpc;
|
use super::one::Web3Rpc;
|
||||||
use super::provider::Web3Provider;
|
use super::provider::Web3Provider;
|
||||||
use crate::frontend::authorization::{Authorization, AuthorizationType};
|
use crate::frontend::authorization::Authorization;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use entities::revert_log;
|
use entities::revert_log;
|
||||||
@ -11,7 +11,6 @@ use log::{debug, error, trace, warn, Level};
|
|||||||
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
|
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use thread_fast_rng::rand::Rng;
|
use thread_fast_rng::rand::Rng;
|
||||||
use tokio::time::{sleep, Duration, Instant};
|
use tokio::time::{sleep, Duration, Instant};
|
||||||
@ -27,11 +26,11 @@ pub enum OpenRequestResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Make RPC requests through this handle and drop it when you are done.
|
/// Make RPC requests through this handle and drop it when you are done.
|
||||||
|
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OpenRequestHandle {
|
pub struct OpenRequestHandle {
|
||||||
authorization: Arc<Authorization>,
|
authorization: Arc<Authorization>,
|
||||||
conn: Arc<Web3Rpc>,
|
conn: Arc<Web3Rpc>,
|
||||||
provider: Arc<Web3Provider>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Depending on the context, RPC errors can require different handling.
|
/// Depending on the context, RPC errors can require different handling.
|
||||||
@ -123,60 +122,9 @@ impl Authorization {
|
|||||||
|
|
||||||
impl OpenRequestHandle {
|
impl OpenRequestHandle {
|
||||||
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Rpc>) -> Self {
|
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Rpc>) -> Self {
|
||||||
// TODO: take request_id as an argument?
|
|
||||||
// TODO: attach a unique id to this? customer requests have one, but not internal queries
|
|
||||||
// TODO: what ordering?!
|
|
||||||
conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
let mut provider = None;
|
|
||||||
let mut logged = false;
|
|
||||||
while provider.is_none() {
|
|
||||||
// trace!("waiting on provider: locking...");
|
|
||||||
|
|
||||||
let ready_provider = conn
|
|
||||||
.provider_state
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
// TODO: hard code true, or take a bool in the `new` function?
|
|
||||||
.provider(true)
|
|
||||||
.await
|
|
||||||
.cloned();
|
|
||||||
// trace!("waiting on provider: unlocked!");
|
|
||||||
|
|
||||||
match ready_provider {
|
|
||||||
None => {
|
|
||||||
if !logged {
|
|
||||||
logged = true;
|
|
||||||
warn!("no provider for {}!", conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
|
|
||||||
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
|
|
||||||
// TODO: this is going to be way too verbose!
|
|
||||||
sleep(Duration::from_millis(100)).await
|
|
||||||
}
|
|
||||||
Some(x) => provider = Some(x),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let provider = provider.expect("provider was checked already");
|
|
||||||
|
|
||||||
// TODO: handle overflows?
|
|
||||||
// TODO: what ordering?
|
|
||||||
match authorization.as_ref().authorization_type {
|
|
||||||
AuthorizationType::Frontend => {
|
|
||||||
conn.frontend_requests
|
|
||||||
.fetch_add(1, atomic::Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
AuthorizationType::Internal => {
|
|
||||||
conn.internal_requests
|
|
||||||
.fetch_add(1, atomic::Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
authorization,
|
authorization,
|
||||||
conn,
|
conn,
|
||||||
provider,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -196,6 +144,7 @@ impl OpenRequestHandle {
|
|||||||
method: &str,
|
method: &str,
|
||||||
params: &P,
|
params: &P,
|
||||||
revert_handler: RequestRevertHandler,
|
revert_handler: RequestRevertHandler,
|
||||||
|
unlocked_provider: Option<Arc<Web3Provider>>,
|
||||||
) -> Result<R, ProviderError>
|
) -> Result<R, ProviderError>
|
||||||
where
|
where
|
||||||
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
|
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
|
||||||
@ -205,12 +154,45 @@ impl OpenRequestHandle {
|
|||||||
// TODO: use tracing spans
|
// TODO: use tracing spans
|
||||||
// TODO: including params in this log is way too verbose
|
// TODO: including params in this log is way too verbose
|
||||||
// trace!(rpc=%self.conn, %method, "request");
|
// trace!(rpc=%self.conn, %method, "request");
|
||||||
|
trace!("requesting from {}", self.conn);
|
||||||
|
|
||||||
|
let mut provider: Option<Arc<Web3Provider>> = None;
|
||||||
|
let mut logged = false;
|
||||||
|
while provider.is_none() {
|
||||||
|
// trace!("waiting on provider: locking...");
|
||||||
|
|
||||||
|
// TODO: this should *not* be new_head_client. that is dedicated to only new heads
|
||||||
|
if let Some(unlocked_provider) = unlocked_provider {
|
||||||
|
provider = Some(unlocked_provider);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let unlocked_provider = self.conn.new_head_client.read().await;
|
||||||
|
|
||||||
|
if let Some(unlocked_provider) = unlocked_provider.clone() {
|
||||||
|
provider = Some(unlocked_provider);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !logged {
|
||||||
|
debug!("no provider for open handle on {}", self.conn);
|
||||||
|
logged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let provider = provider.expect("provider was checked already");
|
||||||
|
|
||||||
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
||||||
let response = match &*self.provider {
|
let response = match provider.as_ref() {
|
||||||
|
#[cfg(test)]
|
||||||
Web3Provider::Mock => unimplemented!(),
|
Web3Provider::Mock => unimplemented!(),
|
||||||
Web3Provider::Http(provider) => provider.request(method, params).await,
|
Web3Provider::Ws(p) => p.request(method, params).await,
|
||||||
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
|
||||||
|
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
|
||||||
|
p.request(method, params).await
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// // TODO: i think ethers already has trace logging (and does it much more fancy)
|
// // TODO: i think ethers already has trace logging (and does it much more fancy)
|
||||||
@ -266,8 +248,22 @@ impl OpenRequestHandle {
|
|||||||
// check for "execution reverted" here
|
// check for "execution reverted" here
|
||||||
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
|
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
|
||||||
// Http and Ws errors are very similar, but different types
|
// Http and Ws errors are very similar, but different types
|
||||||
let msg = match &*self.provider {
|
let msg = match &*provider {
|
||||||
|
#[cfg(test)]
|
||||||
Web3Provider::Mock => unimplemented!(),
|
Web3Provider::Mock => unimplemented!(),
|
||||||
|
Web3Provider::Both(_, _) => {
|
||||||
|
if let Some(HttpClientError::JsonRpcError(err)) =
|
||||||
|
err.downcast_ref::<HttpClientError>()
|
||||||
|
{
|
||||||
|
Some(&err.message)
|
||||||
|
} else if let Some(WsClientError::JsonRpcError(err)) =
|
||||||
|
err.downcast_ref::<WsClientError>()
|
||||||
|
{
|
||||||
|
Some(&err.message)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
Web3Provider::Http(_) => {
|
Web3Provider::Http(_) => {
|
||||||
if let Some(HttpClientError::JsonRpcError(err)) =
|
if let Some(HttpClientError::JsonRpcError(err)) =
|
||||||
err.downcast_ref::<HttpClientError>()
|
err.downcast_ref::<HttpClientError>()
|
||||||
@ -377,11 +373,3 @@ impl OpenRequestHandle {
|
|||||||
response
|
response
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for OpenRequestHandle {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.conn
|
|
||||||
.active_requests
|
|
||||||
.fetch_sub(1, atomic::Ordering::AcqRel);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -28,13 +28,15 @@ impl Web3Rpcs {
|
|||||||
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
|
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
|
||||||
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
|
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
|
||||||
// TODO: if one rpc fails, try another?
|
// TODO: if one rpc fails, try another?
|
||||||
let tx: Transaction = match rpc.try_request_handle(authorization, false).await {
|
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
|
||||||
|
let tx: Transaction = match rpc.try_request_handle(authorization, None).await {
|
||||||
Ok(OpenRequestResult::Handle(handle)) => {
|
Ok(OpenRequestResult::Handle(handle)) => {
|
||||||
handle
|
handle
|
||||||
.request(
|
.request(
|
||||||
"eth_getTransactionByHash",
|
"eth_getTransactionByHash",
|
||||||
&(pending_tx_id,),
|
&(pending_tx_id,),
|
||||||
Level::Error.into(),
|
Level::Error.into(),
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
|
0
web3_proxy/src/rpcs/ws.rs
Normal file
0
web3_proxy/src/rpcs/ws.rs
Normal file
Loading…
Reference in New Issue
Block a user