From a1aa63fe55d063f04b4ec91f9fd47a4cd4ed588f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Nov 2023 13:56:18 -0800 Subject: [PATCH] more SeqCst --- deduped_broadcast/src/lib.rs | 24 +++++++++++------------ deferred-rate-limiter/src/lib.rs | 6 +++--- web3_proxy/src/app/mod.rs | 10 +++++----- web3_proxy/src/frontend/authorization.rs | 8 ++++---- web3_proxy/src/jsonrpc/request.rs | 2 +- web3_proxy/src/jsonrpc/request_builder.rs | 14 ++++++------- web3_proxy/src/kafka.rs | 4 ++-- web3_proxy/src/rpcs/one.rs | 4 +++- web3_proxy/src/rpcs/request.rs | 6 +++--- web3_proxy/src/stats/mod.rs | 12 ++++++------ 10 files changed, 46 insertions(+), 44 deletions(-) diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs index 21a8d55b..0b9f7c00 100644 --- a/deduped_broadcast/src/lib.rs +++ b/deduped_broadcast/src/lib.rs @@ -50,16 +50,16 @@ where /// TODO: change this to be `send` and put a moka cache here instead of lru. then the de-dupe load will be spread across senders pub async fn send(&self, item: T) { // this is just a debug counter so Relaxed is probably fine - self.total_unfiltered.fetch_add(1, Ordering::AcqRel); + self.total_unfiltered.fetch_add(1, Ordering::SeqCst); self.cache .get_with(item.clone(), async { // this is just a debug counter so Relaxed is probably fine - self.total_filtered.fetch_add(1, Ordering::AcqRel); + self.total_filtered.fetch_add(1, Ordering::SeqCst); if let Ok(x) = self.broadcast_filtered_tx.send(item) { // this is just a debug counter so Relaxed is probably fine - self.total_broadcasts.fetch_add(x, Ordering::AcqRel); + self.total_broadcasts.fetch_add(x, Ordering::SeqCst); } }) .await; @@ -78,15 +78,15 @@ where f.debug_struct("DedupedBroadcaster") .field( "total_unfiltered", - &self.total_unfiltered.load(Ordering::Acquire), + &self.total_unfiltered.load(Ordering::SeqCst), ) .field( "total_filtered", - &self.total_filtered.load(Ordering::Acquire), + &self.total_filtered.load(Ordering::SeqCst), ) .field( "total_broadcasts", - &self.total_broadcasts.load(Ordering::Acquire), + &self.total_broadcasts.load(Ordering::SeqCst), ) .field( "subscriptions", @@ -108,15 +108,15 @@ where state.serialize_field( "total_unfiltered", - &self.total_unfiltered.load(Ordering::Acquire), + &self.total_unfiltered.load(Ordering::SeqCst), )?; state.serialize_field( "total_filtered", - &self.total_filtered.load(Ordering::Acquire), + &self.total_filtered.load(Ordering::SeqCst), )?; state.serialize_field( "total_broadcasts", - &self.total_broadcasts.load(Ordering::Acquire), + &self.total_broadcasts.load(Ordering::SeqCst), )?; state.serialize_field( "subscriptions", @@ -156,8 +156,8 @@ mod tests { yield_now().await; - assert_eq!(broadcaster.total_unfiltered.load(Ordering::Acquire), 7); - assert_eq!(broadcaster.total_filtered.load(Ordering::Acquire), 3); - assert_eq!(broadcaster.total_broadcasts.load(Ordering::Acquire), 6); + assert_eq!(broadcaster.total_unfiltered.load(Ordering::SeqCst), 7); + assert_eq!(broadcaster.total_filtered.load(Ordering::SeqCst), 3); + assert_eq!(broadcaster.total_broadcasts.load(Ordering::SeqCst), 6); } } diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index e41a8e2f..0a906612 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -141,7 +141,7 @@ where Ok(deferred_rate_limit_result) } else { // we have a cached amount here - let cached_key_count = local_key_count.fetch_add(count, Ordering::AcqRel); + let cached_key_count = local_key_count.fetch_add(count, Ordering::SeqCst); // assuming no other parallel futures incremented this key, this is the count that redis has let expected_key_count = cached_key_count + count; @@ -168,11 +168,11 @@ where .await { Ok(RedisRateLimitResult::Allowed(count)) => { - local_key_count.store(count, Ordering::Release); + local_key_count.store(count, Ordering::SeqCst); DeferredRateLimitResult::Allowed } Ok(RedisRateLimitResult::RetryAt(retry_at, count)) => { - local_key_count.store(count, Ordering::Release); + local_key_count.store(count, Ordering::SeqCst); DeferredRateLimitResult::RetryAt(retry_at) } Ok(RedisRateLimitResult::RetryNever) => { diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 1b42588b..159f138d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1318,22 +1318,22 @@ impl App { let (code, response) = match last_response { Ok(response_data) => { - web3_request.error_response.store(false, Ordering::Release); + web3_request.error_response.store(false, Ordering::SeqCst); // TODO: is it true that all jsonrpc errors are user errors? web3_request .user_error_response - .store(response_data.is_jsonrpc_err(), Ordering::Release); + .store(response_data.is_jsonrpc_err(), Ordering::SeqCst); (StatusCode::OK, response_data) } Err(err) => { // max tries exceeded. return the error - web3_request.error_response.store(true, Ordering::Release); + web3_request.error_response.store(true, Ordering::SeqCst); web3_request .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::SeqCst); err.as_json_response_parts(web3_request.id()) } @@ -1539,7 +1539,7 @@ impl App { // TODO: only charge for archive if it gave a result web3_request .archive_request - .store(true, atomic::Ordering::Release); + .store(true, atomic::Ordering::SeqCst); // TODO: we don't actually want try_send_all. we want the first non-null, non-error response self diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 310dafa9..5d810e71 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -54,9 +54,9 @@ pub enum RateLimitResult { #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] pub enum AuthorizationType { - /// TODO: sometimes localhost should be internal and other times it should be Frontend. make a better separatation Internal, - Frontend, + Local, + Remote, } /// TODO: move this @@ -319,7 +319,7 @@ impl Authorization { origin, referer, user_agent, - AuthorizationType::Frontend, + AuthorizationType::Remote, ) } @@ -919,7 +919,7 @@ impl App { origin, referer, user_agent, - AuthorizationType::Frontend, + AuthorizationType::Remote, )?; // user key is valid. now check rate limits diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs index 151b5afa..69a668d4 100644 --- a/web3_proxy/src/jsonrpc/request.rs +++ b/web3_proxy/src/jsonrpc/request.rs @@ -136,7 +136,7 @@ impl JsonRpcRequestEnum { request .user_error_response - .store(true, atomic::Ordering::Release); + .store(true, atomic::Ordering::SeqCst); let response = Web3ProxyError::BadRequest("request failed validation".into()); diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index 4ecdd34e..080f98b9 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -181,7 +181,7 @@ impl RequestBuilder { if let Ok(x) = &x { if self.archive_request { - x.archive_request.store(true, atomic::Ordering::Release); + x.archive_request.store(true, atomic::Ordering::SeqCst); } } @@ -283,7 +283,7 @@ impl Serialize for ValidatedRequest { state.serialize_field( "archive_request", - &self.archive_request.load(atomic::Ordering::Acquire), + &self.archive_request.load(atomic::Ordering::SeqCst), )?; state.serialize_field("chain_id", &self.chain_id)?; @@ -306,7 +306,7 @@ impl Serialize for ValidatedRequest { state.serialize_field( "response_bytes", - &self.response_bytes.load(atomic::Ordering::Acquire), + &self.response_bytes.load(atomic::Ordering::SeqCst), )?; state.end() @@ -526,7 +526,7 @@ impl ValidatedRequest { #[inline] pub fn min_block_needed(&self) -> Option { - if self.archive_request.load(atomic::Ordering::Acquire) { + if self.archive_request.load(atomic::Ordering::SeqCst) { Some(U64::zero()) } else { self.cache_mode.from_block().map(|x| x.num()) @@ -580,16 +580,16 @@ impl ValidatedRequest { let num_bytes = response.num_bytes(); self.response_bytes - .fetch_add(num_bytes, atomic::Ordering::AcqRel); + .fetch_add(num_bytes, atomic::Ordering::SeqCst); self.response_millis.fetch_add( self.start_instant.elapsed().as_millis() as u64, - atomic::Ordering::AcqRel, + atomic::Ordering::SeqCst, ); // TODO: record first or last timestamp? really, we need multiple self.response_timestamp - .store(Utc::now().timestamp(), atomic::Ordering::Release); + .store(Utc::now().timestamp(), atomic::Ordering::SeqCst); // TODO: set user_error_response and error_response here instead of outside this function diff --git a/web3_proxy/src/kafka.rs b/web3_proxy/src/kafka.rs index fa26bd33..eda8d5e8 100644 --- a/web3_proxy/src/kafka.rs +++ b/web3_proxy/src/kafka.rs @@ -137,7 +137,7 @@ impl KafkaDebugLogger { serde_json::to_vec(&request).expect("requests should always serialize with rmp"); // this is just a debug counter so Relaxed is probably fine - self.num_requests.fetch_add(1, atomic::Ordering::AcqRel); + self.num_requests.fetch_add(1, atomic::Ordering::SeqCst); self.background_log(payload) } @@ -150,7 +150,7 @@ impl KafkaDebugLogger { serde_json::to_vec(&response).expect("requests should always serialize with rmp"); // this is just a debug counter so Relaxed is probably fine - self.num_responses.fetch_add(1, atomic::Ordering::AcqRel); + self.num_responses.fetch_add(1, atomic::Ordering::SeqCst); self.background_log(payload) } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 84c44266..0902e3e4 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -952,7 +952,7 @@ impl Web3Rpc { /// Subscribe to new block headers. async fn subscribe_new_heads(self: &Arc) -> Web3ProxyResult<()> { - trace!("subscribing to new heads on {}", self); + info!("subscribing to new heads on {}", self); let error_handler = if self.backup { Some(Level::DEBUG.into()) @@ -1001,6 +1001,8 @@ impl Web3Rpc { ) .await; + info!(?block_result, "tick on {}", self); + self.send_head_block_result(block_result).await?; // TODO: should this select be at the start or end of the loop? diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 49111e3c..d354cf30 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -318,17 +318,17 @@ impl OpenRequestHandle { let authorization = &self.web3_request.authorization; match &authorization.authorization_type { - AuthorizationType::Frontend => { + AuthorizationType::Remote | AuthorizationType::Local => { // this is just a debug counter, so Relaxed is probably fine self.rpc .external_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } AuthorizationType::Internal => { // this is just a debug counter, so Relaxed is probably fine self.rpc .internal_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 66c8c201..fd3b57cb 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -551,20 +551,20 @@ impl RpcQueryStats { // TODO: do this without a clone let authorization = metadata.authorization.clone(); - let archive_request = metadata.archive_request.load(Ordering::Acquire); + let archive_request = metadata.archive_request.load(Ordering::SeqCst); // TODO: do this without cloning. we can take their vec let backend_rpcs_used = metadata.backend_rpcs_used(); let request_bytes = metadata.inner.num_bytes() as u64; - let response_bytes = metadata.response_bytes.load(Ordering::Acquire); + let response_bytes = metadata.response_bytes.load(Ordering::SeqCst); - let mut error_response = metadata.error_response.load(Ordering::Acquire); - let mut response_millis = metadata.response_millis.load(Ordering::Acquire); + let mut error_response = metadata.error_response.load(Ordering::SeqCst); + let mut response_millis = metadata.response_millis.load(Ordering::SeqCst); - let user_error_response = metadata.user_error_response.load(Ordering::Acquire); + let user_error_response = metadata.user_error_response.load(Ordering::SeqCst); - let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) { + let response_timestamp = match metadata.response_timestamp.load(Ordering::SeqCst) { 0 => { // no response timestamp! if !error_response {