use Ordering::Relaxed more

This commit is contained in:
Bryan Stitt 2023-09-13 12:35:09 -07:00
parent 1fd8f6f383
commit 8e5a27a184
6 changed files with 46 additions and 41 deletions

View File

@ -1177,30 +1177,30 @@ impl Web3ProxyApp {
Ok(response_data) => {
request_metadata
.error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
(StatusCode::OK, response_data)
}
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
request_metadata
.error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
err.as_response_parts()
}
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
request_metadata
.error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(response_data.is_error(), Ordering::Release);
.store(response_data.is_error(), Ordering::Relaxed);
(StatusCode::OK, response_data)
}
@ -1216,10 +1216,10 @@ impl Web3ProxyApp {
request_metadata
.error_response
.store(true, Ordering::Release);
.store(true, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
err.as_response_parts()
}
@ -1436,7 +1436,7 @@ impl Web3ProxyApp {
if try_archive {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
.store(true, atomic::Ordering::Relaxed);
response_data = self
.balanced_rpcs
@ -1687,7 +1687,7 @@ impl Web3ProxyApp {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
.store(true, atomic::Ordering::Relaxed);
}
Some(JsonRpcQueryCacheKey::new(
@ -1710,7 +1710,7 @@ impl Web3ProxyApp {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
.store(true, atomic::Ordering::Relaxed);
}
Some(JsonRpcQueryCacheKey::new(

View File

@ -313,7 +313,7 @@ impl KafkaDebugLogger {
let payload =
rmp_serde::to_vec(&request).expect("requests should always serialize with rmp");
self.num_requests.fetch_add(1, atomic::Ordering::AcqRel);
self.num_requests.fetch_add(1, atomic::Ordering::Relaxed);
self.background_log(payload)
}
@ -325,12 +325,13 @@ impl KafkaDebugLogger {
let payload =
rmp_serde::to_vec(&response).expect("requests should always serialize with rmp");
self.num_responses.fetch_add(1, atomic::Ordering::AcqRel);
self.num_responses.fetch_add(1, atomic::Ordering::Relaxed);
self.background_log(payload)
}
}
/// TODO: instead of a bunch of atomics, this should probably use a RwLock
#[derive(Debug, Derivative)]
#[derivative(Default)]
pub struct RequestMetadata {
@ -568,16 +569,16 @@ impl RequestMetadata {
let num_bytes = response.num_bytes() as u64;
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::AcqRel);
.fetch_add(num_bytes, atomic::Ordering::Relaxed);
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::AcqRel,
atomic::Ordering::Relaxed,
);
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::Release);
.store(Utc::now().timestamp(), atomic::Ordering::Relaxed);
// TODO: set user_error_response and error_response here instead of outside this function

View File

@ -631,7 +631,7 @@ impl Web3Rpcs {
}
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
if let Some(retry_at) = earliest_retry_at {
@ -814,15 +814,15 @@ impl Web3Rpcs {
if let Some(request_metadata) = request_metadata {
request_metadata
.response_from_backup_rpc
.store(is_backup_response, Ordering::Release);
.store(is_backup_response, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
request_metadata
.error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
}
return Ok(response);
@ -834,7 +834,7 @@ impl Web3Rpcs {
if let Some(request_metadata) = request_metadata {
request_metadata
.user_error_response
.store(true, Ordering::Release);
.store(true, Ordering::Relaxed);
}
x
}
@ -844,11 +844,11 @@ impl Web3Rpcs {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
.store(true, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
}
last_provider_error = Some(error);
@ -979,7 +979,7 @@ impl Web3Rpcs {
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
select! {
@ -999,7 +999,7 @@ impl Web3Rpcs {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
.store(true, Ordering::Relaxed);
}
break;
}
@ -1010,11 +1010,11 @@ impl Web3Rpcs {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(false, Ordering::Release);
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(true, Ordering::Release);
.store(true, Ordering::Relaxed);
}
// this error response is likely the user's fault
@ -1138,7 +1138,7 @@ impl Web3Rpcs {
request_metadata
.response_from_backup_rpc
.store(only_backups_used, Ordering::Release);
.store(only_backups_used, Ordering::Relaxed);
}
let x = self
@ -1162,7 +1162,7 @@ impl Web3Rpcs {
if let Some(request_metadata) = &request_metadata {
// TODO: if this times out, i think we drop this
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
let max_sleep = if let Some(max_wait) = max_wait {
@ -1190,7 +1190,7 @@ impl Web3Rpcs {
}
Err(Some(retry_at)) => {
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
if let Some(max_wait) = max_wait {

View File

@ -136,7 +136,7 @@ impl Web3Rpc {
let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.into();
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0)
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Relaxed) == 0)
&& block_and_rpc_sender.is_some();
// have a sender for tracking hard limit anywhere. we use this in case we
@ -292,8 +292,12 @@ impl Web3Rpc {
Duration::from_secs(1)
};
// TODO: what scaling?
// TODO: figure out how many requests add what level of latency
let request_scaling = 0.01;
// TODO: what ordering?
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f32 + 1.0;
let active_requests =
self.active_requests.load(atomic::Ordering::Relaxed) as f32 * request_scaling + 1.0;
peak_latency.mul_f32(active_requests)
}
@ -367,7 +371,7 @@ impl Web3Rpc {
}
self.block_data_limit
.store(limit, atomic::Ordering::Release);
.store(limit, atomic::Ordering::Relaxed);
}
if limit == Some(u64::MAX) {

View File

@ -127,7 +127,7 @@ impl Drop for OpenRequestHandle {
fn drop(&mut self) {
self.rpc
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
.fetch_sub(1, atomic::Ordering::Relaxed);
}
}
@ -141,7 +141,7 @@ impl OpenRequestHandle {
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let error_handler = error_handler.unwrap_or_default();

View File

@ -547,20 +547,20 @@ impl RpcQueryStats {
let authorization = authorization.expect("Authorization will always be set");
let archive_request = metadata.archive_request.load(Ordering::Acquire);
let archive_request = metadata.archive_request.load(Ordering::Relaxed);
// TODO: do this without cloning. we can take their vec
let backend_rpcs_used = metadata.backend_rpcs_used();
let request_bytes = metadata.request_bytes as u64;
let response_bytes = metadata.response_bytes.load(Ordering::Acquire);
let response_bytes = metadata.response_bytes.load(Ordering::Relaxed);
let mut error_response = metadata.error_response.load(Ordering::Acquire);
let mut response_millis = metadata.response_millis.load(Ordering::Acquire);
let mut error_response = metadata.error_response.load(Ordering::Relaxed);
let mut response_millis = metadata.response_millis.load(Ordering::Relaxed);
let user_error_response = metadata.user_error_response.load(Ordering::Acquire);
let user_error_response = metadata.user_error_response.load(Ordering::Relaxed);
let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) {
let response_timestamp = match metadata.response_timestamp.load(Ordering::Relaxed) {
0 => {
// no response timestamp!
if !error_response {