Merge branch 'devel' of github.com:llamanodes/web3-proxy into fwd-request-id

This commit is contained in:
Rory Neithinger 2023-11-09 17:04:20 -08:00
commit d9649d7d9e
13 changed files with 162 additions and 126 deletions

65
Cargo.lock generated
View File

@ -44,7 +44,7 @@ version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd" checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd"
dependencies = [ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.11",
"once_cell", "once_cell",
"version_check", "version_check",
] ]
@ -56,7 +56,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"getrandom 0.2.10", "getrandom 0.2.11",
"once_cell", "once_cell",
"version_check", "version_check",
"zerocopy", "zerocopy",
@ -1553,9 +1553,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.5" version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys",
@ -2231,9 +2231,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.10" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@ -2988,9 +2988,9 @@ dependencies = [
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.4.10" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829"
[[package]] [[package]]
name = "listenfd" name = "listenfd"
@ -4100,7 +4100,7 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.11",
"serde", "serde",
] ]
@ -4173,9 +4173,9 @@ dependencies = [
[[package]] [[package]]
name = "rdkafka" name = "rdkafka"
version = "0.34.0" version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" checksum = "d54f02a5a40220f8a2dfa47ddb38ba9064475a5807a69504b6f91711df2eea63"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-util", "futures-util",
@ -4192,9 +4192,9 @@ dependencies = [
[[package]] [[package]]
name = "rdkafka-sys" name = "rdkafka-sys"
version = "4.6.0+2.2.0" version = "4.7.0+2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615"
dependencies = [ dependencies = [
"cmake", "cmake",
"libc", "libc",
@ -4247,7 +4247,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4"
dependencies = [ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.11",
"libredox", "libredox",
"thiserror", "thiserror",
] ]
@ -4385,7 +4385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b"
dependencies = [ dependencies = [
"cc", "cc",
"getrandom 0.2.10", "getrandom 0.2.11",
"libc", "libc",
"spin 0.9.8", "spin 0.9.8",
"untrusted 0.9.0", "untrusted 0.9.0",
@ -5008,9 +5008,9 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.190" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -5028,9 +5028,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.190" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -6055,14 +6055,14 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.8.6" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ff9e3abce27ee2c9a37f9ad37238c1bdd4e789c84ba37df76aa4d528f5072cc" checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35"
dependencies = [ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"toml_edit 0.20.7", "toml_edit 0.21.0",
] ]
[[package]] [[package]]
@ -6092,6 +6092,17 @@ name = "toml_edit"
version = "0.20.7" version = "0.20.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81"
dependencies = [
"indexmap 2.1.0",
"toml_datetime",
"winnow",
]
[[package]]
name = "toml_edit"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03"
dependencies = [ dependencies = [
"indexmap 2.1.0", "indexmap 2.1.0",
"serde", "serde",
@ -6454,7 +6465,7 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.11",
"serde", "serde",
] ]
@ -6464,7 +6475,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
dependencies = [ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.11",
"serde", "serde",
] ]
@ -6626,7 +6637,7 @@ dependencies = [
[[package]] [[package]]
name = "web3_proxy" name = "web3_proxy"
version = "1.43.83" version = "1.43.87"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap", "arc-swap",
@ -6690,7 +6701,7 @@ dependencies = [
"time", "time",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"toml 0.8.6", "toml 0.8.8",
"tower-http", "tower-http",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
@ -6703,7 +6714,7 @@ dependencies = [
[[package]] [[package]]
name = "web3_proxy_cli" name = "web3_proxy_cli"
version = "1.43.83" version = "1.43.87"
dependencies = [ dependencies = [
"console-subscriber", "console-subscriber",
"env_logger", "env_logger",

48
Jenkinsfile vendored
View File

@ -40,19 +40,19 @@ pipeline {
} }
} }
} }
// stage('Build and push intel_xeon1 image') { stage('Build and push intel_xeon1 image') {
// agent { agent {
// label 'intel_xeon1' label 'intel_xeon1'
// } }
// environment { environment {
// ARCH="intel_xeon1" ARCH="intel_xeon1"
// } }
// steps { steps {
// script { script {
// myBuildAndPush.buildAndPush() myBuildAndPush.buildAndPush()
// } }
// } }
// } }
} }
} }
stage('push latest') { stage('push latest') {
@ -68,17 +68,17 @@ pipeline {
} }
} }
} }
// stage('maybe push latest_intel_xeon1 tag') { stage('maybe push latest_intel_xeon1 tag') {
// agent any agent any
// environment { environment {
// ARCH="intel_xeon1" ARCH="intel_xeon1"
// } }
// steps { steps {
// script { script {
// myPushLatest.maybePushLatest() myPushLatest.maybePushLatest()
// } }
// } }
// } }
} }
} }
} }

View File

@ -12,5 +12,5 @@ path = "src/mod.rs"
[dependencies] [dependencies]
ethers = { version = "2.0.10", default-features = false } ethers = { version = "2.0.10", default-features = false }
sea-orm = "0.12.4" sea-orm = "0.12.4"
serde = "1.0.190" serde = "1.0.192"
ulid = "1.1.0" ulid = "1.1.0"

View File

@ -7,7 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
portable-atomic = { version = "1.5.1", features = ["float"] } portable-atomic = { version = "1.5.1", features = ["float"] }
serde = { version = "1.0.190", features = [] } serde = { version = "1.0.192", features = [] }
tokio = { version = "1.33.0", features = ["full"] } tokio = { version = "1.33.0", features = ["full"] }
tracing = "0.1.40" tracing = "0.1.40"
watermill = "0.1.1" watermill = "0.1.1"

View File

@ -1,2 +1,2 @@
[toolchain] [toolchain]
channel = "nightly-2023-10-25" channel = "nightly-2023-11-03"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "web3_proxy" name = "web3_proxy"
version = "1.43.83" version = "1.43.87"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -73,12 +73,12 @@ once_cell = { version = "1.18.0" }
ordered-float = {version = "4.1.1" } ordered-float = {version = "4.1.1" }
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] } pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] }
parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] }
rdkafka = { version = "0.34.0", default-features = false, features = ["tokio", "tracing"], optional = true } rdkafka = { version = "0.36.0", default-features = false, features = ["tokio", "tracing"], optional = true }
reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls"] } reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls"] }
rust_decimal = { version = "1.32.0" } rust_decimal = { version = "1.32.0" }
sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] } sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] }
sentry-tracing = "0.31.7" sentry-tracing = "0.31.7"
serde = { version = "1.0.190" } serde = { version = "1.0.192" }
serde-inline-default = "0.1.1" serde-inline-default = "0.1.1"
serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] } serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] }
serde_prometheus = "0.2.4" serde_prometheus = "0.2.4"
@ -86,7 +86,7 @@ strum = { version = "0.25.0", features = ["derive"] }
time = { version = "0.3" } time = { version = "0.3" }
tokio = { version = "1.33.0", features = ["full", "tracing"] } tokio = { version = "1.33.0", features = ["full", "tracing"] }
tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-stream = { version = "0.1.14", features = ["sync"] }
toml = "0.8.6" toml = "0.8.8"
tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] } tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] }
tower-layer = "0.3.2" tower-layer = "0.3.2"
tower-service = "0.3.2" tower-service = "0.3.2"

View File

@ -1253,39 +1253,39 @@ impl App {
// TODO: this clone is only for an error response. refactor to not need it // TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone(); let error_id = request.id.clone();
let mut last_success = None;
let mut last_error = None;
let mut web3_request;
// TODO: think more about how to handle retries without hammering our servers with errors // TODO: think more about how to handle retries without hammering our servers with errors
let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe(); let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe();
let web3_request = match ValidatedRequest::new_with_app(
self,
authorization.clone(),
None,
None,
request.into(),
head_block.clone(),
request_id,
)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
let rpcs = vec![];
return (a, b, rpcs);
}
};
let mut last_success = None;
let mut last_error = None;
let latest_start = sleep_until(Instant::now() + Duration::from_secs(3)); let latest_start = sleep_until(Instant::now() + Duration::from_secs(3));
pin!(latest_start); pin!(latest_start);
// TODO: how many retries? // TODO: how many retries?
loop { loop {
// TODO: refresh the request instead of making new each time. then we need less clones // TODO: refresh the request here?
web3_request = match ValidatedRequest::new_with_app(
self,
authorization.clone(),
None,
None,
request.clone().into(),
head_block.clone(),
request_id.clone(),
)
.await
{
Ok(x) => x,
Err(err) => {
let (a, b) = err.as_json_response_parts(error_id);
let rpcs = vec![];
return (a, b, rpcs);
}
};
// turn some of the Web3ProxyErrors into Ok results // turn some of the Web3ProxyErrors into Ok results
match self._proxy_request_with_caching(&web3_request).await { match self._proxy_request_with_caching(&web3_request).await {
@ -1424,7 +1424,10 @@ impl App {
| "shh_newIdentity" | "shh_newIdentity"
| "shh_post" | "shh_post"
| "shh_uninstallFilter" | "shh_uninstallFilter"
| "shh_version") => { | "shh_version"
| "wallet_getEthereumChains"
| "wallet_getSnaps"
| "wallet_requestSnaps") => {
return Err(Web3ProxyError::MethodNotFound(method.to_owned().into())); return Err(Web3ProxyError::MethodNotFound(method.to_owned().into()));
} }
// TODO: implement these commands // TODO: implement these commands

View File

@ -361,8 +361,17 @@ impl ValidatedRequest {
_ => CacheMode::Never, _ => CacheMode::Never,
}; };
// TODO: what should we do if we want a really short max_wait?
let connect_timeout = Duration::from_secs(10); let connect_timeout = Duration::from_secs(10);
let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295));
let expire_timeout = if let Some(max_wait) = max_wait {
max_wait
} else if authorization.active_premium().await {
Duration::from_secs(295)
} else {
Duration::from_secs(60)
}
.max(connect_timeout);
let x = Self { let x = Self {
archive_request: false.into(), archive_request: false.into(),

View File

@ -402,13 +402,20 @@ impl ConsensusFinder {
rpc: Option<&Arc<Web3Rpc>>, rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>, new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<bool> { ) -> Web3ProxyResult<bool> {
let rpc_block_sender = rpc.and_then(|x| x.head_block_sender.as_ref());
let new_ranked_rpcs = match self let new_ranked_rpcs = match self
.rank_rpcs(web3_rpcs) .rank_rpcs(web3_rpcs)
.await .await
.web3_context("error while finding consensus head block!")? .web3_context("error while finding consensus head block!")?
{ {
None => { None => {
warn!("no ranked rpcs found!"); warn!(?rpc, ?new_block, "no ranked rpcs found!");
if let Some(rpc_block_sender) = rpc_block_sender {
rpc_block_sender.send_replace(new_block);
}
return Ok(false); return Ok(false);
} }
Some(x) => x, Some(x) => x,
@ -428,6 +435,10 @@ impl ConsensusFinder {
let new_ranked_rpcs = Arc::new(new_ranked_rpcs); let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
if let Some(rpc_block_sender) = rpc_block_sender {
rpc_block_sender.send_replace(new_block.clone());
}
let old_ranked_rpcs = web3_rpcs let old_ranked_rpcs = web3_rpcs
.watch_ranked_rpcs .watch_ranked_rpcs
.send_replace(Some(new_ranked_rpcs.clone())); .send_replace(Some(new_ranked_rpcs.clone()));
@ -672,7 +683,7 @@ impl ConsensusFinder {
0 => {} 0 => {}
1 => { 1 => {
for rpc in self.rpc_heads.keys() { for rpc in self.rpc_heads.keys() {
rpc.tier.store(1, atomic::Ordering::Release) rpc.tier.store(1, atomic::Ordering::SeqCst)
} }
} }
_ => { _ => {
@ -752,7 +763,7 @@ impl ConsensusFinder {
trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier); trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier);
rpc.tier.store(tier, atomic::Ordering::Release); rpc.tier.store(tier, atomic::Ordering::SeqCst);
} }
} }
} }
@ -810,7 +821,7 @@ impl ConsensusFinder {
HashMap::with_capacity(num_known); HashMap::with_capacity(num_known);
for (rpc, rpc_head) in self.rpc_heads.iter() { for (rpc, rpc_head) in self.rpc_heads.iter() {
if !rpc.healthy.load(atomic::Ordering::Acquire) { if !rpc.healthy.load(atomic::Ordering::SeqCst) {
// TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough // TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough
continue; continue;
} }
@ -878,14 +889,14 @@ impl ConsensusFinder {
pub fn best_tier(&self) -> Option<u32> { pub fn best_tier(&self) -> Option<u32> {
self.rpc_heads self.rpc_heads
.iter() .iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst))
.min() .min()
} }
pub fn worst_tier(&self) -> Option<u32> { pub fn worst_tier(&self) -> Option<u32> {
self.rpc_heads self.rpc_heads
.iter() .iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst))
.max() .max()
} }
} }

View File

@ -155,7 +155,7 @@ impl Web3Rpc {
let backup = config.backup; let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.into(); let block_data_limit: AtomicU64 = config.block_data_limit.into();
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) let automatic_block_limit = (block_data_limit.load(atomic::Ordering::SeqCst) == 0)
&& block_and_rpc_sender.is_some(); && block_and_rpc_sender.is_some();
// have a sender for tracking hard limit anywhere. we use this in case we // have a sender for tracking hard limit anywhere. we use this in case we
@ -287,7 +287,7 @@ impl Web3Rpc {
head_block = head_block.min(max_block); head_block = head_block.min(max_block);
} }
let tier = self.tier.load(atomic::Ordering::Acquire); let tier = self.tier.load(atomic::Ordering::SeqCst);
let backup = self.backup; let backup = self.backup;
@ -354,7 +354,7 @@ impl Web3Rpc {
let request_scaling = 0.01; let request_scaling = 0.01;
// TODO: what ordering? // TODO: what ordering?
let active_requests = let active_requests =
self.active_requests.load(atomic::Ordering::Acquire) as f32 * request_scaling + 1.0; self.active_requests.load(atomic::Ordering::SeqCst) as f32 * request_scaling + 1.0;
peak_latency.mul_f32(active_requests) peak_latency.mul_f32(active_requests)
} }
@ -438,8 +438,7 @@ impl Web3Rpc {
warn!("{} is unable to serve requests", self); warn!("{} is unable to serve requests", self);
} }
self.block_data_limit self.block_data_limit.store(limit, atomic::Ordering::SeqCst);
.store(limit, atomic::Ordering::Release);
} }
if limit == Some(u64::MAX) { if limit == Some(u64::MAX) {
@ -453,7 +452,7 @@ impl Web3Rpc {
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range /// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
pub fn block_data_limit(&self) -> U64 { pub fn block_data_limit(&self) -> U64 {
self.block_data_limit.load(atomic::Ordering::Acquire).into() self.block_data_limit.load(atomic::Ordering::SeqCst).into()
} }
/// TODO: get rid of this now that consensus rpcs does it /// TODO: get rid of this now that consensus rpcs does it
@ -568,7 +567,10 @@ impl Web3Rpc {
self: &Arc<Self>, self: &Arc<Self>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>, new_head_block: Web3ProxyResult<Option<ArcBlock>>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let head_block_sender = self.head_block_sender.as_ref().unwrap(); let head_block_sender = self
.head_block_sender
.as_ref()
.expect("head_block_sender is always set");
let new_head_block = match new_head_block { let new_head_block = match new_head_block {
Ok(x) => { Ok(x) => {
@ -585,11 +587,9 @@ impl Web3Rpc {
trace!("clearing head block on {} ({}ms old)!", self, age); trace!("clearing head block on {} ({}ms old)!", self, age);
// send an empty block to take this server out of rotation
head_block_sender.send_replace(None);
// TODO: clear self.block_data_limit? // TODO: clear self.block_data_limit?
// send an empty block to take this server out of rotation
None None
} }
Some(new_head_block) => { Some(new_head_block) => {
@ -604,7 +604,6 @@ impl Web3Rpc {
.await; .await;
// we are synced! yey! // we are synced! yey!
head_block_sender.send_replace(Some(new_head_block.clone()));
// TODO: checking this every time seems excessive // TODO: checking this every time seems excessive
if self.block_data_limit() == U64::zero() { if self.block_data_limit() == U64::zero() {
@ -632,11 +631,14 @@ impl Web3Rpc {
} }
}; };
// tell web3rpcs about this rpc having this block
if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender { if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender {
// tell web3rpcs about this rpc having this block
// web3rpcs will do `self.head_block_sender.send_replace(new_head_block)`
block_and_rpc_sender block_and_rpc_sender
.send((new_head_block, self.clone())) .send((new_head_block, self.clone()))
.context("block_and_rpc_sender failed sending")?; .context("block_and_rpc_sender failed sending")?;
} else {
head_block_sender.send_replace(new_head_block);
} }
Ok(()) Ok(())
@ -763,7 +765,7 @@ impl Web3Rpc {
.await .await
.web3_context("failed check_provider") .web3_context("failed check_provider")
{ {
self.healthy.store(false, atomic::Ordering::Release); self.healthy.store(false, atomic::Ordering::SeqCst);
return Err(err); return Err(err);
} }
@ -791,14 +793,14 @@ impl Web3Rpc {
break; break;
} }
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Acquire) new_total_requests = rpc.internal_requests.load(atomic::Ordering::SeqCst)
+ rpc.external_requests.load(atomic::Ordering::Acquire); + rpc.external_requests.load(atomic::Ordering::SeqCst);
let detailed_healthcheck = new_total_requests - old_total_requests < 5; let detailed_healthcheck = new_total_requests - old_total_requests < 5;
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await { if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await {
rpc.healthy.store(false, atomic::Ordering::Release); rpc.healthy.store(false, atomic::Ordering::SeqCst);
// TODO: different level depending on the error handler // TODO: different level depending on the error handler
// TODO: if rate limit error, set "retry_at" // TODO: if rate limit error, set "retry_at"
@ -808,7 +810,7 @@ impl Web3Rpc {
error!(?err, "health check on {} failed", rpc); error!(?err, "health check on {} failed", rpc);
} }
} else { } else {
rpc.healthy.store(true, atomic::Ordering::Release); rpc.healthy.store(true, atomic::Ordering::SeqCst);
} }
// TODO: should we count the requests done inside this health check // TODO: should we count the requests done inside this health check
@ -833,7 +835,7 @@ impl Web3Rpc {
true true
}; };
self.healthy.store(initial_check, atomic::Ordering::Release); self.healthy.store(initial_check, atomic::Ordering::SeqCst);
tokio::spawn(f) tokio::spawn(f)
} else { } else {
@ -849,7 +851,7 @@ impl Web3Rpc {
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_provider().await { if let Err(err) = rpc.check_provider().await {
rpc.healthy.store(false, atomic::Ordering::Release); rpc.healthy.store(false, atomic::Ordering::SeqCst);
// TODO: if rate limit error, set "retry_at" // TODO: if rate limit error, set "retry_at"
if rpc.backup { if rpc.backup {
@ -858,7 +860,7 @@ impl Web3Rpc {
error!(?err, "provider check on {} failed", rpc); error!(?err, "provider check on {} failed", rpc);
} }
} else { } else {
rpc.healthy.store(true, atomic::Ordering::Release); rpc.healthy.store(true, atomic::Ordering::SeqCst);
} }
sleep(Duration::from_secs(health_sleep_seconds)).await; sleep(Duration::from_secs(health_sleep_seconds)).await;
@ -904,7 +906,7 @@ impl Web3Rpc {
let (first_exit, _, _) = select_all(futures).await; let (first_exit, _, _) = select_all(futures).await;
// mark unhealthy // mark unhealthy
self.healthy.store(false, atomic::Ordering::Release); self.healthy.store(false, atomic::Ordering::SeqCst);
debug!(?first_exit, "subscriptions on {} exited", self); debug!(?first_exit, "subscriptions on {} exited", self);
@ -987,7 +989,7 @@ impl Web3Rpc {
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
// TODO: is 1/2 the block time okay? // TODO: is 1/2 the block time okay?
let mut i = interval(self.block_interval / 2); let mut i = interval(self.block_interval / 2);
i.set_missed_tick_behavior(MissedTickBehavior::Skip); i.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop { loop {
let block_result = self let block_result = self
@ -1163,7 +1165,7 @@ impl Web3Rpc {
// TODO: if websocket is reconnecting, return an error? // TODO: if websocket is reconnecting, return an error?
if !allow_unhealthy { if !allow_unhealthy {
if !(self.healthy.load(atomic::Ordering::Acquire)) { if !(self.healthy.load(atomic::Ordering::SeqCst)) {
return Ok(OpenRequestResult::Failed); return Ok(OpenRequestResult::Failed);
} }
@ -1361,7 +1363,7 @@ impl Serialize for Web3Rpc {
state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?; state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?;
match self.block_data_limit.load(atomic::Ordering::Acquire) { match self.block_data_limit.load(atomic::Ordering::SeqCst) {
u64::MAX => { u64::MAX => {
state.serialize_field("block_data_limit", &None::<()>)?; state.serialize_field("block_data_limit", &None::<()>)?;
} }
@ -1384,17 +1386,17 @@ impl Serialize for Web3Rpc {
state.serialize_field( state.serialize_field(
"external_requests", "external_requests",
&self.external_requests.load(atomic::Ordering::Acquire), &self.external_requests.load(atomic::Ordering::SeqCst),
)?; )?;
state.serialize_field( state.serialize_field(
"internal_requests", "internal_requests",
&self.internal_requests.load(atomic::Ordering::Acquire), &self.internal_requests.load(atomic::Ordering::SeqCst),
)?; )?;
state.serialize_field( state.serialize_field(
"active_requests", "active_requests",
&self.active_requests.load(atomic::Ordering::Acquire), &self.active_requests.load(atomic::Ordering::SeqCst),
)?; )?;
{ {
@ -1423,7 +1425,7 @@ impl Serialize for Web3Rpc {
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
} }
{ {
let healthy = self.healthy.load(atomic::Ordering::Acquire); let healthy = self.healthy.load(atomic::Ordering::SeqCst);
state.serialize_field("healthy", &healthy)?; state.serialize_field("healthy", &healthy)?;
} }
@ -1437,7 +1439,7 @@ impl fmt::Debug for Web3Rpc {
f.field("name", &self.name); f.field("name", &self.name);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire); let block_data_limit = self.block_data_limit.load(atomic::Ordering::SeqCst);
if block_data_limit == u64::MAX { if block_data_limit == u64::MAX {
f.field("blocks", &"all"); f.field("blocks", &"all");
} else { } else {
@ -1446,7 +1448,7 @@ impl fmt::Debug for Web3Rpc {
f.field("backup", &self.backup); f.field("backup", &self.backup);
f.field("tier", &self.tier.load(atomic::Ordering::Acquire)); f.field("tier", &self.tier.load(atomic::Ordering::SeqCst));
f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); f.field("weighted_ms", &self.weighted_peak_latency().as_millis());

View File

@ -145,7 +145,7 @@ impl Drop for OpenRequestHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.rpc self.rpc
.active_requests .active_requests
.fetch_sub(1, atomic::Ordering::AcqRel); .fetch_sub(1, atomic::Ordering::SeqCst);
} }
} }
@ -159,7 +159,7 @@ impl OpenRequestHandle {
// TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?! // TODO: what ordering?!
rpc.active_requests rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel); .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let error_handler = error_handler.unwrap_or_default(); let error_handler = error_handler.unwrap_or_default();

View File

@ -1,6 +1,6 @@
[package] [package]
name = "web3_proxy_cli" name = "web3_proxy_cli"
version = "1.43.83" version = "1.43.87"
edition = "2021" edition = "2021"
default-run = "web3_proxy_cli" default-run = "web3_proxy_cli"
@ -22,7 +22,7 @@ web3_proxy = { path = "../web3_proxy" }
console-subscriber = { version = "0.2.0", features = ["env-filter", "parking_lot"], optional = true } console-subscriber = { version = "0.2.0", features = ["env-filter", "parking_lot"], optional = true }
parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] }
prettytable = { version = "0.10.0", default-features = false } prettytable = { version = "0.10.0", default-features = false }
serde = { version = "1.0.190" } serde = { version = "1.0.192" }
serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] } serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] }
tokio-console = { version = "0.1.10", optional = true } tokio-console = { version = "0.1.10", optional = true }
tracing = "0.1" tracing = "0.1"

View File

@ -250,7 +250,7 @@ async fn it_matches_anvil() {
.unwrap(); .unwrap();
info!(?deploy_tx); info!(?deploy_tx);
yield_now().await; sleep(Duration::from_secs(1)).await;
let head_block_num: U64 = quorum_provider let head_block_num: U64 = quorum_provider
.request("eth_blockNumber", ()) .request("eth_blockNumber", ())