diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9add5b8a..ac7e8f67 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -810,7 +810,7 @@ impl ConsensusFinder { HashMap::with_capacity(num_known); for (rpc, rpc_head) in self.rpc_heads.iter() { - if !rpc.healthy.load(atomic::Ordering::Acquire) { + if !rpc.healthy.load(atomic::Ordering::SeqCst) { // TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough continue; } @@ -878,14 +878,14 @@ impl ConsensusFinder { pub fn best_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) + .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst)) .min() } pub fn worst_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) + .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst)) .max() } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e3825710..8dd1213d 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -155,7 +155,7 @@ impl Web3Rpc { let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.into(); - let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) + let automatic_block_limit = (block_data_limit.load(atomic::Ordering::SeqCst) == 0) && block_and_rpc_sender.is_some(); // have a sender for tracking hard limit anywhere. we use this in case we @@ -287,7 +287,7 @@ impl Web3Rpc { head_block = head_block.min(max_block); } - let tier = self.tier.load(atomic::Ordering::Acquire); + let tier = self.tier.load(atomic::Ordering::SeqCst); let backup = self.backup; @@ -354,7 +354,7 @@ impl Web3Rpc { let request_scaling = 0.01; // TODO: what ordering? let active_requests = - self.active_requests.load(atomic::Ordering::Acquire) as f32 * request_scaling + 1.0; + self.active_requests.load(atomic::Ordering::SeqCst) as f32 * request_scaling + 1.0; peak_latency.mul_f32(active_requests) } @@ -453,7 +453,7 @@ impl Web3Rpc { /// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range pub fn block_data_limit(&self) -> U64 { - self.block_data_limit.load(atomic::Ordering::Acquire).into() + self.block_data_limit.load(atomic::Ordering::SeqCst).into() } /// TODO: get rid of this now that consensus rpcs does it @@ -791,8 +791,8 @@ impl Web3Rpc { break; } - new_total_requests = rpc.internal_requests.load(atomic::Ordering::Acquire) - + rpc.external_requests.load(atomic::Ordering::Acquire); + new_total_requests = rpc.internal_requests.load(atomic::Ordering::SeqCst) + + rpc.external_requests.load(atomic::Ordering::SeqCst); let detailed_healthcheck = new_total_requests - old_total_requests < 5; @@ -1163,7 +1163,7 @@ impl Web3Rpc { // TODO: if websocket is reconnecting, return an error? if !allow_unhealthy { - if !(self.healthy.load(atomic::Ordering::Acquire)) { + if !(self.healthy.load(atomic::Ordering::SeqCst)) { return Ok(OpenRequestResult::Failed); } @@ -1361,7 +1361,7 @@ impl Serialize for Web3Rpc { state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?; - match self.block_data_limit.load(atomic::Ordering::Acquire) { + match self.block_data_limit.load(atomic::Ordering::SeqCst) { u64::MAX => { state.serialize_field("block_data_limit", &None::<()>)?; } @@ -1384,17 +1384,17 @@ impl Serialize for Web3Rpc { state.serialize_field( "external_requests", - &self.external_requests.load(atomic::Ordering::Acquire), + &self.external_requests.load(atomic::Ordering::SeqCst), )?; state.serialize_field( "internal_requests", - &self.internal_requests.load(atomic::Ordering::Acquire), + &self.internal_requests.load(atomic::Ordering::SeqCst), )?; state.serialize_field( "active_requests", - &self.active_requests.load(atomic::Ordering::Acquire), + &self.active_requests.load(atomic::Ordering::SeqCst), )?; { @@ -1423,7 +1423,7 @@ impl Serialize for Web3Rpc { state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; } { - let healthy = self.healthy.load(atomic::Ordering::Acquire); + let healthy = self.healthy.load(atomic::Ordering::SeqCst); state.serialize_field("healthy", &healthy)?; } @@ -1437,7 +1437,7 @@ impl fmt::Debug for Web3Rpc { f.field("name", &self.name); - let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire); + let block_data_limit = self.block_data_limit.load(atomic::Ordering::SeqCst); if block_data_limit == u64::MAX { f.field("blocks", &"all"); } else { @@ -1446,7 +1446,7 @@ impl fmt::Debug for Web3Rpc { f.field("backup", &self.backup); - f.field("tier", &self.tier.load(atomic::Ordering::Acquire)); + f.field("tier", &self.tier.load(atomic::Ordering::SeqCst)); f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index b47d82b0..d4c8d421 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -145,7 +145,7 @@ impl Drop for OpenRequestHandle { fn drop(&mut self) { self.rpc .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); + .fetch_sub(1, atomic::Ordering::SeqCst); } } @@ -159,7 +159,7 @@ impl OpenRequestHandle { // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! rpc.active_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); let error_handler = error_handler.unwrap_or_default();