diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 408fc9fd..4f65130d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1177,30 +1177,30 @@ impl Web3ProxyApp { Ok(response_data) => { request_metadata .error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); request_metadata .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); (StatusCode::OK, response_data) } Err(err @ Web3ProxyError::NullJsonRpcResult) => { request_metadata .error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); request_metadata .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); err.as_response_parts() } Err(Web3ProxyError::JsonRpcResponse(response_data)) => { request_metadata .error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); request_metadata .user_error_response - .store(response_data.is_error(), Ordering::Release); + .store(response_data.is_error(), Ordering::Relaxed); (StatusCode::OK, response_data) } @@ -1216,10 +1216,10 @@ impl Web3ProxyApp { request_metadata .error_response - .store(true, Ordering::Release); + .store(true, Ordering::Relaxed); request_metadata .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); err.as_response_parts() } @@ -1436,7 +1436,7 @@ impl Web3ProxyApp { if try_archive { request_metadata .archive_request - .store(true, atomic::Ordering::Release); + .store(true, atomic::Ordering::Relaxed); response_data = self .balanced_rpcs @@ -1687,7 +1687,7 @@ impl Web3ProxyApp { request_metadata .archive_request - .store(true, atomic::Ordering::Release); + .store(true, atomic::Ordering::Relaxed); } Some(JsonRpcQueryCacheKey::new( @@ -1710,7 +1710,7 @@ impl Web3ProxyApp { request_metadata .archive_request - .store(true, atomic::Ordering::Release); + .store(true, atomic::Ordering::Relaxed); } Some(JsonRpcQueryCacheKey::new( diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 968d2ced..7d40c281 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -313,7 +313,7 @@ impl KafkaDebugLogger { let payload = rmp_serde::to_vec(&request).expect("requests should always serialize with rmp"); - self.num_requests.fetch_add(1, atomic::Ordering::AcqRel); + self.num_requests.fetch_add(1, atomic::Ordering::Relaxed); self.background_log(payload) } @@ -325,12 +325,13 @@ impl KafkaDebugLogger { let payload = rmp_serde::to_vec(&response).expect("requests should always serialize with rmp"); - self.num_responses.fetch_add(1, atomic::Ordering::AcqRel); + self.num_responses.fetch_add(1, atomic::Ordering::Relaxed); self.background_log(payload) } } +/// TODO: instead of a bunch of atomics, this should probably use a RwLock #[derive(Debug, Derivative)] #[derivative(Default)] pub struct RequestMetadata { @@ -568,16 +569,16 @@ impl RequestMetadata { let num_bytes = response.num_bytes() as u64; self.response_bytes - .fetch_add(num_bytes, atomic::Ordering::AcqRel); + .fetch_add(num_bytes, atomic::Ordering::Relaxed); self.response_millis.fetch_add( self.start_instant.elapsed().as_millis() as u64, - atomic::Ordering::AcqRel, + atomic::Ordering::Relaxed, ); // TODO: record first or last timestamp? really, we need multiple self.response_timestamp - .store(Utc::now().timestamp(), atomic::Ordering::Release); + .store(Utc::now().timestamp(), atomic::Ordering::Relaxed); // TODO: set user_error_response and error_response here instead of outside this function diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e5f773e1..22ee132b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -631,7 +631,7 @@ impl Web3Rpcs { } if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); } if let Some(retry_at) = earliest_retry_at { @@ -814,15 +814,15 @@ impl Web3Rpcs { if let Some(request_metadata) = request_metadata { request_metadata .response_from_backup_rpc - .store(is_backup_response, Ordering::Release); + .store(is_backup_response, Ordering::Relaxed); request_metadata .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); request_metadata .error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); } return Ok(response); @@ -834,7 +834,7 @@ impl Web3Rpcs { if let Some(request_metadata) = request_metadata { request_metadata .user_error_response - .store(true, Ordering::Release); + .store(true, Ordering::Relaxed); } x } @@ -844,11 +844,11 @@ impl Web3Rpcs { if let Some(request_metadata) = request_metadata { request_metadata .error_response - .store(true, Ordering::Release); + .store(true, Ordering::Relaxed); request_metadata .user_error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); } last_provider_error = Some(error); @@ -979,7 +979,7 @@ impl Web3Rpcs { // TODO: have a separate column for rate limited? if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); } select! { @@ -999,7 +999,7 @@ impl Web3Rpcs { if let Some(request_metadata) = request_metadata { request_metadata .error_response - .store(true, Ordering::Release); + .store(true, Ordering::Relaxed); } break; } @@ -1010,11 +1010,11 @@ impl Web3Rpcs { if let Some(request_metadata) = request_metadata { request_metadata .error_response - .store(false, Ordering::Release); + .store(false, Ordering::Relaxed); request_metadata .user_error_response - .store(true, Ordering::Release); + .store(true, Ordering::Relaxed); } // this error response is likely the user's fault @@ -1138,7 +1138,7 @@ impl Web3Rpcs { request_metadata .response_from_backup_rpc - .store(only_backups_used, Ordering::Release); + .store(only_backups_used, Ordering::Relaxed); } let x = self @@ -1162,7 +1162,7 @@ impl Web3Rpcs { if let Some(request_metadata) = &request_metadata { // TODO: if this times out, i think we drop this - request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); } let max_sleep = if let Some(max_wait) = max_wait { @@ -1190,7 +1190,7 @@ impl Web3Rpcs { } Err(Some(retry_at)) => { if let Some(request_metadata) = &request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); } if let Some(max_wait) = max_wait { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 464a28fc..d8de02a0 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -136,7 +136,7 @@ impl Web3Rpc { let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.into(); - let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) + let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Relaxed) == 0) && block_and_rpc_sender.is_some(); // have a sender for tracking hard limit anywhere. we use this in case we @@ -292,8 +292,12 @@ impl Web3Rpc { Duration::from_secs(1) }; + // TODO: what scaling? + // TODO: figure out how many requests add what level of latency + let request_scaling = 0.01; // TODO: what ordering? - let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f32 + 1.0; + let active_requests = + self.active_requests.load(atomic::Ordering::Relaxed) as f32 * request_scaling + 1.0; peak_latency.mul_f32(active_requests) } @@ -367,7 +371,7 @@ impl Web3Rpc { } self.block_data_limit - .store(limit, atomic::Ordering::Release); + .store(limit, atomic::Ordering::Relaxed); } if limit == Some(u64::MAX) { diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 50493e64..2b615734 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -127,7 +127,7 @@ impl Drop for OpenRequestHandle { fn drop(&mut self) { self.rpc .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); + .fetch_sub(1, atomic::Ordering::Relaxed); } } @@ -141,7 +141,7 @@ impl OpenRequestHandle { // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! rpc.active_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let error_handler = error_handler.unwrap_or_default(); diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 5217ea6d..3e2fb071 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -547,20 +547,20 @@ impl RpcQueryStats { let authorization = authorization.expect("Authorization will always be set"); - let archive_request = metadata.archive_request.load(Ordering::Acquire); + let archive_request = metadata.archive_request.load(Ordering::Relaxed); // TODO: do this without cloning. we can take their vec let backend_rpcs_used = metadata.backend_rpcs_used(); let request_bytes = metadata.request_bytes as u64; - let response_bytes = metadata.response_bytes.load(Ordering::Acquire); + let response_bytes = metadata.response_bytes.load(Ordering::Relaxed); - let mut error_response = metadata.error_response.load(Ordering::Acquire); - let mut response_millis = metadata.response_millis.load(Ordering::Acquire); + let mut error_response = metadata.error_response.load(Ordering::Relaxed); + let mut response_millis = metadata.response_millis.load(Ordering::Relaxed); - let user_error_response = metadata.user_error_response.load(Ordering::Acquire); + let user_error_response = metadata.user_error_response.load(Ordering::Relaxed); - let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) { + let response_timestamp = match metadata.response_timestamp.load(Ordering::Relaxed) { 0 => { // no response timestamp! if !error_response {