more SeqCst

This commit is contained in:
Bryan Stitt 2023-11-08 15:19:36 -08:00
parent 156ff1c33a
commit a0e586dac2
2 changed files with 10 additions and 11 deletions

View File

@ -672,7 +672,7 @@ impl ConsensusFinder {
0 => {} 0 => {}
1 => { 1 => {
for rpc in self.rpc_heads.keys() { for rpc in self.rpc_heads.keys() {
rpc.tier.store(1, atomic::Ordering::Release) rpc.tier.store(1, atomic::Ordering::SeqCst)
} }
} }
_ => { _ => {
@ -752,7 +752,7 @@ impl ConsensusFinder {
trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier); trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier);
rpc.tier.store(tier, atomic::Ordering::Release); rpc.tier.store(tier, atomic::Ordering::SeqCst);
} }
} }
} }

View File

@ -438,8 +438,7 @@ impl Web3Rpc {
warn!("{} is unable to serve requests", self); warn!("{} is unable to serve requests", self);
} }
self.block_data_limit self.block_data_limit.store(limit, atomic::Ordering::SeqCst);
.store(limit, atomic::Ordering::Release);
} }
if limit == Some(u64::MAX) { if limit == Some(u64::MAX) {
@ -763,7 +762,7 @@ impl Web3Rpc {
.await .await
.web3_context("failed check_provider") .web3_context("failed check_provider")
{ {
self.healthy.store(false, atomic::Ordering::Release); self.healthy.store(false, atomic::Ordering::SeqCst);
return Err(err); return Err(err);
} }
@ -798,7 +797,7 @@ impl Web3Rpc {
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await { if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await {
rpc.healthy.store(false, atomic::Ordering::Release); rpc.healthy.store(false, atomic::Ordering::SeqCst);
// TODO: different level depending on the error handler // TODO: different level depending on the error handler
// TODO: if rate limit error, set "retry_at" // TODO: if rate limit error, set "retry_at"
@ -808,7 +807,7 @@ impl Web3Rpc {
error!(?err, "health check on {} failed", rpc); error!(?err, "health check on {} failed", rpc);
} }
} else { } else {
rpc.healthy.store(true, atomic::Ordering::Release); rpc.healthy.store(true, atomic::Ordering::SeqCst);
} }
// TODO: should we count the requests done inside this health check // TODO: should we count the requests done inside this health check
@ -833,7 +832,7 @@ impl Web3Rpc {
true true
}; };
self.healthy.store(initial_check, atomic::Ordering::Release); self.healthy.store(initial_check, atomic::Ordering::SeqCst);
tokio::spawn(f) tokio::spawn(f)
} else { } else {
@ -849,7 +848,7 @@ impl Web3Rpc {
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_provider().await { if let Err(err) = rpc.check_provider().await {
rpc.healthy.store(false, atomic::Ordering::Release); rpc.healthy.store(false, atomic::Ordering::SeqCst);
// TODO: if rate limit error, set "retry_at" // TODO: if rate limit error, set "retry_at"
if rpc.backup { if rpc.backup {
@ -858,7 +857,7 @@ impl Web3Rpc {
error!(?err, "provider check on {} failed", rpc); error!(?err, "provider check on {} failed", rpc);
} }
} else { } else {
rpc.healthy.store(true, atomic::Ordering::Release); rpc.healthy.store(true, atomic::Ordering::SeqCst);
} }
sleep(Duration::from_secs(health_sleep_seconds)).await; sleep(Duration::from_secs(health_sleep_seconds)).await;
@ -904,7 +903,7 @@ impl Web3Rpc {
let (first_exit, _, _) = select_all(futures).await; let (first_exit, _, _) = select_all(futures).await;
// mark unhealthy // mark unhealthy
self.healthy.store(false, atomic::Ordering::Release); self.healthy.store(false, atomic::Ordering::SeqCst);
debug!(?first_exit, "subscriptions on {} exited", self); debug!(?first_exit, "subscriptions on {} exited", self);