diff --git a/Cargo.lock b/Cargo.lock index f5fc5907..d9d9701d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,7 +44,7 @@ version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "once_cell", "version_check", ] @@ -56,7 +56,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", - "getrandom 0.2.10", + "getrandom 0.2.11", "once_cell", "version_check", "zerocopy", @@ -1553,9 +1553,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" dependencies = [ "libc", "windows-sys", @@ -2231,9 +2231,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "libc", @@ -2988,9 +2988,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" [[package]] name = "listenfd" @@ -4100,7 +4100,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "serde", ] @@ -4173,9 +4173,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.34.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" +checksum = "d54f02a5a40220f8a2dfa47ddb38ba9064475a5807a69504b6f91711df2eea63" dependencies = [ "futures-channel", "futures-util", @@ -4192,9 +4192,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.6.0+2.2.0" +version = "4.7.0+2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" dependencies = [ "cmake", "libc", @@ -4247,7 +4247,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "libredox", "thiserror", ] @@ -4385,7 +4385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" dependencies = [ "cc", - "getrandom 0.2.10", + "getrandom 0.2.11", "libc", "spin 0.9.8", "untrusted 0.9.0", @@ -5008,9 +5008,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.190" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" +checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" dependencies = [ "serde_derive", ] @@ -5028,9 +5028,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.190" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" +checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", @@ -6055,14 +6055,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.6" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ff9e3abce27ee2c9a37f9ad37238c1bdd4e789c84ba37df76aa4d528f5072cc" +checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.20.7", + "toml_edit 0.21.0", ] [[package]] @@ -6092,6 +6092,17 @@ name = "toml_edit" version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" +dependencies = [ + "indexmap 2.1.0", + "toml_datetime", + "winnow", +] + +[[package]] +name = "toml_edit" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ "indexmap 2.1.0", "serde", @@ -6454,7 +6465,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "serde", ] @@ -6464,7 +6475,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ - "getrandom 0.2.10", + "getrandom 0.2.11", "serde", ] @@ -6626,7 +6637,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "1.43.83" +version = "1.43.87" dependencies = [ "anyhow", "arc-swap", @@ -6690,7 +6701,7 @@ dependencies = [ "time", "tokio", "tokio-stream", - "toml 0.8.6", + "toml 0.8.8", "tower-http", "tower-layer", "tower-service", @@ -6703,7 +6714,7 @@ dependencies = [ [[package]] name = "web3_proxy_cli" -version = "1.43.83" +version = "1.43.87" dependencies = [ "console-subscriber", "env_logger", diff --git a/Jenkinsfile b/Jenkinsfile index dccd797d..691a4bfd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -40,19 +40,19 @@ pipeline { } } } - // stage('Build and push intel_xeon1 image') { - // agent { - // label 'intel_xeon1' - // } - // environment { - // ARCH="intel_xeon1" - // } - // steps { - // script { - // myBuildAndPush.buildAndPush() - // } - // } - // } + stage('Build and push intel_xeon1 image') { + agent { + label 'intel_xeon1' + } + environment { + ARCH="intel_xeon1" + } + steps { + script { + myBuildAndPush.buildAndPush() + } + } + } } } stage('push latest') { @@ -68,17 +68,17 @@ pipeline { } } } - // stage('maybe push latest_intel_xeon1 tag') { - // agent any - // environment { - // ARCH="intel_xeon1" - // } - // steps { - // script { - // myPushLatest.maybePushLatest() - // } - // } - // } + stage('maybe push latest_intel_xeon1 tag') { + agent any + environment { + ARCH="intel_xeon1" + } + steps { + script { + myPushLatest.maybePushLatest() + } + } + } } } } diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 0eb6ed27..4b6d61b9 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -12,5 +12,5 @@ path = "src/mod.rs" [dependencies] ethers = { version = "2.0.10", default-features = false } sea-orm = "0.12.4" -serde = "1.0.190" +serde = "1.0.192" ulid = "1.1.0" diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 45b61f38..33d8f924 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] portable-atomic = { version = "1.5.1", features = ["float"] } -serde = { version = "1.0.190", features = [] } +serde = { version = "1.0.192", features = [] } tokio = { version = "1.33.0", features = ["full"] } tracing = "0.1.40" watermill = "0.1.1" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index e687f8d5..f2c9ba01 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2023-10-25" +channel = "nightly-2023-11-03" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4f3934e3..d4e83a0f 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "1.43.83" +version = "1.43.87" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -73,12 +73,12 @@ once_cell = { version = "1.18.0" } ordered-float = {version = "4.1.1" } pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } -rdkafka = { version = "0.34.0", default-features = false, features = ["tokio", "tracing"], optional = true } +rdkafka = { version = "0.36.0", default-features = false, features = ["tokio", "tracing"], optional = true } reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls"] } rust_decimal = { version = "1.32.0" } sentry = { version = "0.31.7", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] } sentry-tracing = "0.31.7" -serde = { version = "1.0.190" } +serde = { version = "1.0.192" } serde-inline-default = "0.1.1" serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] } serde_prometheus = "0.2.4" @@ -86,7 +86,7 @@ strum = { version = "0.25.0", features = ["derive"] } time = { version = "0.3" } tokio = { version = "1.33.0", features = ["full", "tracing"] } tokio-stream = { version = "0.1.14", features = ["sync"] } -toml = "0.8.6" +toml = "0.8.8" tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] } tower-layer = "0.3.2" tower-service = "0.3.2" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 24b34c72..1b42588b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1253,39 +1253,39 @@ impl App { // TODO: this clone is only for an error response. refactor to not need it let error_id = request.id.clone(); - let mut last_success = None; - let mut last_error = None; - let mut web3_request; - // TODO: think more about how to handle retries without hammering our servers with errors let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe(); + let web3_request = match ValidatedRequest::new_with_app( + self, + authorization.clone(), + None, + None, + request.into(), + head_block.clone(), + request_id, + ) + .await + { + Ok(x) => x, + Err(err) => { + let (a, b) = err.as_json_response_parts(error_id); + + let rpcs = vec![]; + + return (a, b, rpcs); + } + }; + + let mut last_success = None; + let mut last_error = None; + let latest_start = sleep_until(Instant::now() + Duration::from_secs(3)); pin!(latest_start); // TODO: how many retries? loop { - // TODO: refresh the request instead of making new each time. then we need less clones - web3_request = match ValidatedRequest::new_with_app( - self, - authorization.clone(), - None, - None, - request.clone().into(), - head_block.clone(), - request_id.clone(), - ) - .await - { - Ok(x) => x, - Err(err) => { - let (a, b) = err.as_json_response_parts(error_id); - - let rpcs = vec![]; - - return (a, b, rpcs); - } - }; + // TODO: refresh the request here? // turn some of the Web3ProxyErrors into Ok results match self._proxy_request_with_caching(&web3_request).await { @@ -1424,7 +1424,10 @@ impl App { | "shh_newIdentity" | "shh_post" | "shh_uninstallFilter" - | "shh_version") => { + | "shh_version" + | "wallet_getEthereumChains" + | "wallet_getSnaps" + | "wallet_requestSnaps") => { return Err(Web3ProxyError::MethodNotFound(method.to_owned().into())); } // TODO: implement these commands diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index b5716ac4..4ecdd34e 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -361,8 +361,17 @@ impl ValidatedRequest { _ => CacheMode::Never, }; + // TODO: what should we do if we want a really short max_wait? let connect_timeout = Duration::from_secs(10); - let expire_timeout = max_wait.unwrap_or_else(|| Duration::from_secs(295)); + + let expire_timeout = if let Some(max_wait) = max_wait { + max_wait + } else if authorization.active_premium().await { + Duration::from_secs(295) + } else { + Duration::from_secs(60) + } + .max(connect_timeout); let x = Self { archive_request: false.into(), diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 1a1423b4..8fd28639 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -402,13 +402,20 @@ impl ConsensusFinder { rpc: Option<&Arc>, new_block: Option, ) -> Web3ProxyResult { + let rpc_block_sender = rpc.and_then(|x| x.head_block_sender.as_ref()); + let new_ranked_rpcs = match self .rank_rpcs(web3_rpcs) .await .web3_context("error while finding consensus head block!")? { None => { - warn!("no ranked rpcs found!"); + warn!(?rpc, ?new_block, "no ranked rpcs found!"); + + if let Some(rpc_block_sender) = rpc_block_sender { + rpc_block_sender.send_replace(new_block); + } + return Ok(false); } Some(x) => x, @@ -428,6 +435,10 @@ impl ConsensusFinder { let new_ranked_rpcs = Arc::new(new_ranked_rpcs); + if let Some(rpc_block_sender) = rpc_block_sender { + rpc_block_sender.send_replace(new_block.clone()); + } + let old_ranked_rpcs = web3_rpcs .watch_ranked_rpcs .send_replace(Some(new_ranked_rpcs.clone())); @@ -672,7 +683,7 @@ impl ConsensusFinder { 0 => {} 1 => { for rpc in self.rpc_heads.keys() { - rpc.tier.store(1, atomic::Ordering::Release) + rpc.tier.store(1, atomic::Ordering::SeqCst) } } _ => { @@ -752,7 +763,7 @@ impl ConsensusFinder { trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier); - rpc.tier.store(tier, atomic::Ordering::Release); + rpc.tier.store(tier, atomic::Ordering::SeqCst); } } } @@ -810,7 +821,7 @@ impl ConsensusFinder { HashMap::with_capacity(num_known); for (rpc, rpc_head) in self.rpc_heads.iter() { - if !rpc.healthy.load(atomic::Ordering::Acquire) { + if !rpc.healthy.load(atomic::Ordering::SeqCst) { // TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough continue; } @@ -878,14 +889,14 @@ impl ConsensusFinder { pub fn best_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) + .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst)) .min() } pub fn worst_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) + .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst)) .max() } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 161b070f..84c44266 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -155,7 +155,7 @@ impl Web3Rpc { let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.into(); - let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) + let automatic_block_limit = (block_data_limit.load(atomic::Ordering::SeqCst) == 0) && block_and_rpc_sender.is_some(); // have a sender for tracking hard limit anywhere. we use this in case we @@ -287,7 +287,7 @@ impl Web3Rpc { head_block = head_block.min(max_block); } - let tier = self.tier.load(atomic::Ordering::Acquire); + let tier = self.tier.load(atomic::Ordering::SeqCst); let backup = self.backup; @@ -354,7 +354,7 @@ impl Web3Rpc { let request_scaling = 0.01; // TODO: what ordering? let active_requests = - self.active_requests.load(atomic::Ordering::Acquire) as f32 * request_scaling + 1.0; + self.active_requests.load(atomic::Ordering::SeqCst) as f32 * request_scaling + 1.0; peak_latency.mul_f32(active_requests) } @@ -438,8 +438,7 @@ impl Web3Rpc { warn!("{} is unable to serve requests", self); } - self.block_data_limit - .store(limit, atomic::Ordering::Release); + self.block_data_limit.store(limit, atomic::Ordering::SeqCst); } if limit == Some(u64::MAX) { @@ -453,7 +452,7 @@ impl Web3Rpc { /// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range pub fn block_data_limit(&self) -> U64 { - self.block_data_limit.load(atomic::Ordering::Acquire).into() + self.block_data_limit.load(atomic::Ordering::SeqCst).into() } /// TODO: get rid of this now that consensus rpcs does it @@ -568,7 +567,10 @@ impl Web3Rpc { self: &Arc, new_head_block: Web3ProxyResult>, ) -> Web3ProxyResult<()> { - let head_block_sender = self.head_block_sender.as_ref().unwrap(); + let head_block_sender = self + .head_block_sender + .as_ref() + .expect("head_block_sender is always set"); let new_head_block = match new_head_block { Ok(x) => { @@ -585,11 +587,9 @@ impl Web3Rpc { trace!("clearing head block on {} ({}ms old)!", self, age); - // send an empty block to take this server out of rotation - head_block_sender.send_replace(None); - // TODO: clear self.block_data_limit? + // send an empty block to take this server out of rotation None } Some(new_head_block) => { @@ -604,7 +604,6 @@ impl Web3Rpc { .await; // we are synced! yey! - head_block_sender.send_replace(Some(new_head_block.clone())); // TODO: checking this every time seems excessive if self.block_data_limit() == U64::zero() { @@ -632,11 +631,14 @@ impl Web3Rpc { } }; - // tell web3rpcs about this rpc having this block if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender { + // tell web3rpcs about this rpc having this block + // web3rpcs will do `self.head_block_sender.send_replace(new_head_block)` block_and_rpc_sender .send((new_head_block, self.clone())) .context("block_and_rpc_sender failed sending")?; + } else { + head_block_sender.send_replace(new_head_block); } Ok(()) @@ -763,7 +765,7 @@ impl Web3Rpc { .await .web3_context("failed check_provider") { - self.healthy.store(false, atomic::Ordering::Release); + self.healthy.store(false, atomic::Ordering::SeqCst); return Err(err); } @@ -791,14 +793,14 @@ impl Web3Rpc { break; } - new_total_requests = rpc.internal_requests.load(atomic::Ordering::Acquire) - + rpc.external_requests.load(atomic::Ordering::Acquire); + new_total_requests = rpc.internal_requests.load(atomic::Ordering::SeqCst) + + rpc.external_requests.load(atomic::Ordering::SeqCst); let detailed_healthcheck = new_total_requests - old_total_requests < 5; // TODO: if this fails too many times, reset the connection if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await { - rpc.healthy.store(false, atomic::Ordering::Release); + rpc.healthy.store(false, atomic::Ordering::SeqCst); // TODO: different level depending on the error handler // TODO: if rate limit error, set "retry_at" @@ -808,7 +810,7 @@ impl Web3Rpc { error!(?err, "health check on {} failed", rpc); } } else { - rpc.healthy.store(true, atomic::Ordering::Release); + rpc.healthy.store(true, atomic::Ordering::SeqCst); } // TODO: should we count the requests done inside this health check @@ -833,7 +835,7 @@ impl Web3Rpc { true }; - self.healthy.store(initial_check, atomic::Ordering::Release); + self.healthy.store(initial_check, atomic::Ordering::SeqCst); tokio::spawn(f) } else { @@ -849,7 +851,7 @@ impl Web3Rpc { // TODO: if this fails too many times, reset the connection if let Err(err) = rpc.check_provider().await { - rpc.healthy.store(false, atomic::Ordering::Release); + rpc.healthy.store(false, atomic::Ordering::SeqCst); // TODO: if rate limit error, set "retry_at" if rpc.backup { @@ -858,7 +860,7 @@ impl Web3Rpc { error!(?err, "provider check on {} failed", rpc); } } else { - rpc.healthy.store(true, atomic::Ordering::Release); + rpc.healthy.store(true, atomic::Ordering::SeqCst); } sleep(Duration::from_secs(health_sleep_seconds)).await; @@ -904,7 +906,7 @@ impl Web3Rpc { let (first_exit, _, _) = select_all(futures).await; // mark unhealthy - self.healthy.store(false, atomic::Ordering::Release); + self.healthy.store(false, atomic::Ordering::SeqCst); debug!(?first_exit, "subscriptions on {} exited", self); @@ -987,7 +989,7 @@ impl Web3Rpc { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // TODO: is 1/2 the block time okay? let mut i = interval(self.block_interval / 2); - i.set_missed_tick_behavior(MissedTickBehavior::Skip); + i.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { let block_result = self @@ -1163,7 +1165,7 @@ impl Web3Rpc { // TODO: if websocket is reconnecting, return an error? if !allow_unhealthy { - if !(self.healthy.load(atomic::Ordering::Acquire)) { + if !(self.healthy.load(atomic::Ordering::SeqCst)) { return Ok(OpenRequestResult::Failed); } @@ -1361,7 +1363,7 @@ impl Serialize for Web3Rpc { state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?; - match self.block_data_limit.load(atomic::Ordering::Acquire) { + match self.block_data_limit.load(atomic::Ordering::SeqCst) { u64::MAX => { state.serialize_field("block_data_limit", &None::<()>)?; } @@ -1384,17 +1386,17 @@ impl Serialize for Web3Rpc { state.serialize_field( "external_requests", - &self.external_requests.load(atomic::Ordering::Acquire), + &self.external_requests.load(atomic::Ordering::SeqCst), )?; state.serialize_field( "internal_requests", - &self.internal_requests.load(atomic::Ordering::Acquire), + &self.internal_requests.load(atomic::Ordering::SeqCst), )?; state.serialize_field( "active_requests", - &self.active_requests.load(atomic::Ordering::Acquire), + &self.active_requests.load(atomic::Ordering::SeqCst), )?; { @@ -1423,7 +1425,7 @@ impl Serialize for Web3Rpc { state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; } { - let healthy = self.healthy.load(atomic::Ordering::Acquire); + let healthy = self.healthy.load(atomic::Ordering::SeqCst); state.serialize_field("healthy", &healthy)?; } @@ -1437,7 +1439,7 @@ impl fmt::Debug for Web3Rpc { f.field("name", &self.name); - let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire); + let block_data_limit = self.block_data_limit.load(atomic::Ordering::SeqCst); if block_data_limit == u64::MAX { f.field("blocks", &"all"); } else { @@ -1446,7 +1448,7 @@ impl fmt::Debug for Web3Rpc { f.field("backup", &self.backup); - f.field("tier", &self.tier.load(atomic::Ordering::Acquire)); + f.field("tier", &self.tier.load(atomic::Ordering::SeqCst)); f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index f74ca358..28ef9b6f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -145,7 +145,7 @@ impl Drop for OpenRequestHandle { fn drop(&mut self) { self.rpc .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); + .fetch_sub(1, atomic::Ordering::SeqCst); } } @@ -159,7 +159,7 @@ impl OpenRequestHandle { // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! rpc.active_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); let error_handler = error_handler.unwrap_or_default(); diff --git a/web3_proxy_cli/Cargo.toml b/web3_proxy_cli/Cargo.toml index ac9aad0d..0020df17 100644 --- a/web3_proxy_cli/Cargo.toml +++ b/web3_proxy_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy_cli" -version = "1.43.83" +version = "1.43.87" edition = "2021" default-run = "web3_proxy_cli" @@ -22,7 +22,7 @@ web3_proxy = { path = "../web3_proxy" } console-subscriber = { version = "0.2.0", features = ["env-filter", "parking_lot"], optional = true } parking_lot = { version = "0.12.1", features = ["arc_lock", "nightly"] } prettytable = { version = "0.10.0", default-features = false } -serde = { version = "1.0.190" } +serde = { version = "1.0.192" } serde_json = { version = "1.0.108", default-features = false, features = ["raw_value"] } tokio-console = { version = "0.1.10", optional = true } tracing = "0.1" diff --git a/web3_proxy_cli/tests/test_proxy.rs b/web3_proxy_cli/tests/test_proxy.rs index 12a78030..31a96a66 100644 --- a/web3_proxy_cli/tests/test_proxy.rs +++ b/web3_proxy_cli/tests/test_proxy.rs @@ -250,7 +250,7 @@ async fn it_matches_anvil() { .unwrap(); info!(?deploy_tx); - yield_now().await; + sleep(Duration::from_secs(1)).await; let head_block_num: U64 = quorum_provider .request("eth_blockNumber", ())