This commit is contained in:
Bryan Stitt 2023-11-08 10:49:40 -08:00
parent 5702c9e097
commit c22640d3f2
3 changed files with 19 additions and 19 deletions

View File

@ -810,7 +810,7 @@ impl ConsensusFinder {
HashMap::with_capacity(num_known); HashMap::with_capacity(num_known);
for (rpc, rpc_head) in self.rpc_heads.iter() { for (rpc, rpc_head) in self.rpc_heads.iter() {
if !rpc.healthy.load(atomic::Ordering::Acquire) { if !rpc.healthy.load(atomic::Ordering::SeqCst) {
// TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough // TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough
continue; continue;
} }
@ -878,14 +878,14 @@ impl ConsensusFinder {
pub fn best_tier(&self) -> Option<u32> { pub fn best_tier(&self) -> Option<u32> {
self.rpc_heads self.rpc_heads
.iter() .iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst))
.min() .min()
} }
pub fn worst_tier(&self) -> Option<u32> { pub fn worst_tier(&self) -> Option<u32> {
self.rpc_heads self.rpc_heads
.iter() .iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .map(|(x, _)| x.tier.load(atomic::Ordering::SeqCst))
.max() .max()
} }
} }

View File

@ -155,7 +155,7 @@ impl Web3Rpc {
let backup = config.backup; let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.into(); let block_data_limit: AtomicU64 = config.block_data_limit.into();
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) let automatic_block_limit = (block_data_limit.load(atomic::Ordering::SeqCst) == 0)
&& block_and_rpc_sender.is_some(); && block_and_rpc_sender.is_some();
// have a sender for tracking hard limit anywhere. we use this in case we // have a sender for tracking hard limit anywhere. we use this in case we
@ -287,7 +287,7 @@ impl Web3Rpc {
head_block = head_block.min(max_block); head_block = head_block.min(max_block);
} }
let tier = self.tier.load(atomic::Ordering::Acquire); let tier = self.tier.load(atomic::Ordering::SeqCst);
let backup = self.backup; let backup = self.backup;
@ -354,7 +354,7 @@ impl Web3Rpc {
let request_scaling = 0.01; let request_scaling = 0.01;
// TODO: what ordering? // TODO: what ordering?
let active_requests = let active_requests =
self.active_requests.load(atomic::Ordering::Acquire) as f32 * request_scaling + 1.0; self.active_requests.load(atomic::Ordering::SeqCst) as f32 * request_scaling + 1.0;
peak_latency.mul_f32(active_requests) peak_latency.mul_f32(active_requests)
} }
@ -453,7 +453,7 @@ impl Web3Rpc {
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range /// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
pub fn block_data_limit(&self) -> U64 { pub fn block_data_limit(&self) -> U64 {
self.block_data_limit.load(atomic::Ordering::Acquire).into() self.block_data_limit.load(atomic::Ordering::SeqCst).into()
} }
/// TODO: get rid of this now that consensus rpcs does it /// TODO: get rid of this now that consensus rpcs does it
@ -791,8 +791,8 @@ impl Web3Rpc {
break; break;
} }
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Acquire) new_total_requests = rpc.internal_requests.load(atomic::Ordering::SeqCst)
+ rpc.external_requests.load(atomic::Ordering::Acquire); + rpc.external_requests.load(atomic::Ordering::SeqCst);
let detailed_healthcheck = new_total_requests - old_total_requests < 5; let detailed_healthcheck = new_total_requests - old_total_requests < 5;
@ -1163,7 +1163,7 @@ impl Web3Rpc {
// TODO: if websocket is reconnecting, return an error? // TODO: if websocket is reconnecting, return an error?
if !allow_unhealthy { if !allow_unhealthy {
if !(self.healthy.load(atomic::Ordering::Acquire)) { if !(self.healthy.load(atomic::Ordering::SeqCst)) {
return Ok(OpenRequestResult::Failed); return Ok(OpenRequestResult::Failed);
} }
@ -1361,7 +1361,7 @@ impl Serialize for Web3Rpc {
state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?; state.serialize_field("web3_clientVersion", &self.client_version.read().as_ref())?;
match self.block_data_limit.load(atomic::Ordering::Acquire) { match self.block_data_limit.load(atomic::Ordering::SeqCst) {
u64::MAX => { u64::MAX => {
state.serialize_field("block_data_limit", &None::<()>)?; state.serialize_field("block_data_limit", &None::<()>)?;
} }
@ -1384,17 +1384,17 @@ impl Serialize for Web3Rpc {
state.serialize_field( state.serialize_field(
"external_requests", "external_requests",
&self.external_requests.load(atomic::Ordering::Acquire), &self.external_requests.load(atomic::Ordering::SeqCst),
)?; )?;
state.serialize_field( state.serialize_field(
"internal_requests", "internal_requests",
&self.internal_requests.load(atomic::Ordering::Acquire), &self.internal_requests.load(atomic::Ordering::SeqCst),
)?; )?;
state.serialize_field( state.serialize_field(
"active_requests", "active_requests",
&self.active_requests.load(atomic::Ordering::Acquire), &self.active_requests.load(atomic::Ordering::SeqCst),
)?; )?;
{ {
@ -1423,7 +1423,7 @@ impl Serialize for Web3Rpc {
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
} }
{ {
let healthy = self.healthy.load(atomic::Ordering::Acquire); let healthy = self.healthy.load(atomic::Ordering::SeqCst);
state.serialize_field("healthy", &healthy)?; state.serialize_field("healthy", &healthy)?;
} }
@ -1437,7 +1437,7 @@ impl fmt::Debug for Web3Rpc {
f.field("name", &self.name); f.field("name", &self.name);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire); let block_data_limit = self.block_data_limit.load(atomic::Ordering::SeqCst);
if block_data_limit == u64::MAX { if block_data_limit == u64::MAX {
f.field("blocks", &"all"); f.field("blocks", &"all");
} else { } else {
@ -1446,7 +1446,7 @@ impl fmt::Debug for Web3Rpc {
f.field("backup", &self.backup); f.field("backup", &self.backup);
f.field("tier", &self.tier.load(atomic::Ordering::Acquire)); f.field("tier", &self.tier.load(atomic::Ordering::SeqCst));
f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); f.field("weighted_ms", &self.weighted_peak_latency().as_millis());

View File

@ -145,7 +145,7 @@ impl Drop for OpenRequestHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.rpc self.rpc
.active_requests .active_requests
.fetch_sub(1, atomic::Ordering::AcqRel); .fetch_sub(1, atomic::Ordering::SeqCst);
} }
} }
@ -159,7 +159,7 @@ impl OpenRequestHandle {
// TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?! // TODO: what ordering?!
rpc.active_requests rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel); .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let error_handler = error_handler.unwrap_or_default(); let error_handler = error_handler.unwrap_or_default();