more SeqCst

This commit is contained in:
Bryan Stitt 2023-11-14 13:56:18 -08:00
parent a501a2d2fa
commit a1aa63fe55
10 changed files with 46 additions and 44 deletions

@ -50,16 +50,16 @@ where
/// TODO: change this to be `send` and put a moka cache here instead of lru. then the de-dupe load will be spread across senders
pub async fn send(&self, item: T) {
// this is just a debug counter so Relaxed is probably fine
self.total_unfiltered.fetch_add(1, Ordering::AcqRel);
self.total_unfiltered.fetch_add(1, Ordering::SeqCst);
self.cache
.get_with(item.clone(), async {
// this is just a debug counter so Relaxed is probably fine
self.total_filtered.fetch_add(1, Ordering::AcqRel);
self.total_filtered.fetch_add(1, Ordering::SeqCst);
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
// this is just a debug counter so Relaxed is probably fine
self.total_broadcasts.fetch_add(x, Ordering::AcqRel);
self.total_broadcasts.fetch_add(x, Ordering::SeqCst);
}
})
.await;
@ -78,15 +78,15 @@ where
f.debug_struct("DedupedBroadcaster")
.field(
"total_unfiltered",
&self.total_unfiltered.load(Ordering::Acquire),
&self.total_unfiltered.load(Ordering::SeqCst),
)
.field(
"total_filtered",
&self.total_filtered.load(Ordering::Acquire),
&self.total_filtered.load(Ordering::SeqCst),
)
.field(
"total_broadcasts",
&self.total_broadcasts.load(Ordering::Acquire),
&self.total_broadcasts.load(Ordering::SeqCst),
)
.field(
"subscriptions",
@ -108,15 +108,15 @@ where
state.serialize_field(
"total_unfiltered",
&self.total_unfiltered.load(Ordering::Acquire),
&self.total_unfiltered.load(Ordering::SeqCst),
)?;
state.serialize_field(
"total_filtered",
&self.total_filtered.load(Ordering::Acquire),
&self.total_filtered.load(Ordering::SeqCst),
)?;
state.serialize_field(
"total_broadcasts",
&self.total_broadcasts.load(Ordering::Acquire),
&self.total_broadcasts.load(Ordering::SeqCst),
)?;
state.serialize_field(
"subscriptions",
@ -156,8 +156,8 @@ mod tests {
yield_now().await;
assert_eq!(broadcaster.total_unfiltered.load(Ordering::Acquire), 7);
assert_eq!(broadcaster.total_filtered.load(Ordering::Acquire), 3);
assert_eq!(broadcaster.total_broadcasts.load(Ordering::Acquire), 6);
assert_eq!(broadcaster.total_unfiltered.load(Ordering::SeqCst), 7);
assert_eq!(broadcaster.total_filtered.load(Ordering::SeqCst), 3);
assert_eq!(broadcaster.total_broadcasts.load(Ordering::SeqCst), 6);
}
}

@ -141,7 +141,7 @@ where
Ok(deferred_rate_limit_result)
} else {
// we have a cached amount here
let cached_key_count = local_key_count.fetch_add(count, Ordering::AcqRel);
let cached_key_count = local_key_count.fetch_add(count, Ordering::SeqCst);
// assuming no other parallel futures incremented this key, this is the count that redis has
let expected_key_count = cached_key_count + count;
@ -168,11 +168,11 @@ where
.await
{
Ok(RedisRateLimitResult::Allowed(count)) => {
local_key_count.store(count, Ordering::Release);
local_key_count.store(count, Ordering::SeqCst);
DeferredRateLimitResult::Allowed
}
Ok(RedisRateLimitResult::RetryAt(retry_at, count)) => {
local_key_count.store(count, Ordering::Release);
local_key_count.store(count, Ordering::SeqCst);
DeferredRateLimitResult::RetryAt(retry_at)
}
Ok(RedisRateLimitResult::RetryNever) => {

@ -1318,22 +1318,22 @@ impl App {
let (code, response) = match last_response {
Ok(response_data) => {
web3_request.error_response.store(false, Ordering::Release);
web3_request.error_response.store(false, Ordering::SeqCst);
// TODO: is it true that all jsonrpc errors are user errors?
web3_request
.user_error_response
.store(response_data.is_jsonrpc_err(), Ordering::Release);
.store(response_data.is_jsonrpc_err(), Ordering::SeqCst);
(StatusCode::OK, response_data)
}
Err(err) => {
// max tries exceeded. return the error
web3_request.error_response.store(true, Ordering::Release);
web3_request.error_response.store(true, Ordering::SeqCst);
web3_request
.user_error_response
.store(false, Ordering::Release);
.store(false, Ordering::SeqCst);
err.as_json_response_parts(web3_request.id())
}
@ -1539,7 +1539,7 @@ impl App {
// TODO: only charge for archive if it gave a result
web3_request
.archive_request
.store(true, atomic::Ordering::Release);
.store(true, atomic::Ordering::SeqCst);
// TODO: we don't actually want try_send_all. we want the first non-null, non-error response
self

@ -54,9 +54,9 @@ pub enum RateLimitResult {
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
pub enum AuthorizationType {
/// TODO: sometimes localhost should be internal and other times it should be Frontend. make a better separatation
Internal,
Frontend,
Local,
Remote,
}
/// TODO: move this
@ -319,7 +319,7 @@ impl Authorization {
origin,
referer,
user_agent,
AuthorizationType::Frontend,
AuthorizationType::Remote,
)
}
@ -919,7 +919,7 @@ impl App {
origin,
referer,
user_agent,
AuthorizationType::Frontend,
AuthorizationType::Remote,
)?;
// user key is valid. now check rate limits

@ -136,7 +136,7 @@ impl JsonRpcRequestEnum {
request
.user_error_response
.store(true, atomic::Ordering::Release);
.store(true, atomic::Ordering::SeqCst);
let response = Web3ProxyError::BadRequest("request failed validation".into());

@ -181,7 +181,7 @@ impl RequestBuilder {
if let Ok(x) = &x {
if self.archive_request {
x.archive_request.store(true, atomic::Ordering::Release);
x.archive_request.store(true, atomic::Ordering::SeqCst);
}
}
@ -283,7 +283,7 @@ impl Serialize for ValidatedRequest {
state.serialize_field(
"archive_request",
&self.archive_request.load(atomic::Ordering::Acquire),
&self.archive_request.load(atomic::Ordering::SeqCst),
)?;
state.serialize_field("chain_id", &self.chain_id)?;
@ -306,7 +306,7 @@ impl Serialize for ValidatedRequest {
state.serialize_field(
"response_bytes",
&self.response_bytes.load(atomic::Ordering::Acquire),
&self.response_bytes.load(atomic::Ordering::SeqCst),
)?;
state.end()
@ -526,7 +526,7 @@ impl ValidatedRequest {
#[inline]
pub fn min_block_needed(&self) -> Option<U64> {
if self.archive_request.load(atomic::Ordering::Acquire) {
if self.archive_request.load(atomic::Ordering::SeqCst) {
Some(U64::zero())
} else {
self.cache_mode.from_block().map(|x| x.num())
@ -580,16 +580,16 @@ impl ValidatedRequest {
let num_bytes = response.num_bytes();
self.response_bytes
.fetch_add(num_bytes, atomic::Ordering::AcqRel);
.fetch_add(num_bytes, atomic::Ordering::SeqCst);
self.response_millis.fetch_add(
self.start_instant.elapsed().as_millis() as u64,
atomic::Ordering::AcqRel,
atomic::Ordering::SeqCst,
);
// TODO: record first or last timestamp? really, we need multiple
self.response_timestamp
.store(Utc::now().timestamp(), atomic::Ordering::Release);
.store(Utc::now().timestamp(), atomic::Ordering::SeqCst);
// TODO: set user_error_response and error_response here instead of outside this function

@ -137,7 +137,7 @@ impl KafkaDebugLogger {
serde_json::to_vec(&request).expect("requests should always serialize with rmp");
// this is just a debug counter so Relaxed is probably fine
self.num_requests.fetch_add(1, atomic::Ordering::AcqRel);
self.num_requests.fetch_add(1, atomic::Ordering::SeqCst);
self.background_log(payload)
}
@ -150,7 +150,7 @@ impl KafkaDebugLogger {
serde_json::to_vec(&response).expect("requests should always serialize with rmp");
// this is just a debug counter so Relaxed is probably fine
self.num_responses.fetch_add(1, atomic::Ordering::AcqRel);
self.num_responses.fetch_add(1, atomic::Ordering::SeqCst);
self.background_log(payload)
}

@ -952,7 +952,7 @@ impl Web3Rpc {
/// Subscribe to new block headers.
async fn subscribe_new_heads(self: &Arc<Self>) -> Web3ProxyResult<()> {
trace!("subscribing to new heads on {}", self);
info!("subscribing to new heads on {}", self);
let error_handler = if self.backup {
Some(Level::DEBUG.into())
@ -1001,6 +1001,8 @@ impl Web3Rpc {
)
.await;
info!(?block_result, "tick on {}", self);
self.send_head_block_result(block_result).await?;
// TODO: should this select be at the start or end of the loop?

@ -318,17 +318,17 @@ impl OpenRequestHandle {
let authorization = &self.web3_request.authorization;
match &authorization.authorization_type {
AuthorizationType::Frontend => {
AuthorizationType::Remote | AuthorizationType::Local => {
// this is just a debug counter, so Relaxed is probably fine
self.rpc
.external_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
AuthorizationType::Internal => {
// this is just a debug counter, so Relaxed is probably fine
self.rpc
.internal_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}

@ -551,20 +551,20 @@ impl RpcQueryStats {
// TODO: do this without a clone
let authorization = metadata.authorization.clone();
let archive_request = metadata.archive_request.load(Ordering::Acquire);
let archive_request = metadata.archive_request.load(Ordering::SeqCst);
// TODO: do this without cloning. we can take their vec
let backend_rpcs_used = metadata.backend_rpcs_used();
let request_bytes = metadata.inner.num_bytes() as u64;
let response_bytes = metadata.response_bytes.load(Ordering::Acquire);
let response_bytes = metadata.response_bytes.load(Ordering::SeqCst);
let mut error_response = metadata.error_response.load(Ordering::Acquire);
let mut response_millis = metadata.response_millis.load(Ordering::Acquire);
let mut error_response = metadata.error_response.load(Ordering::SeqCst);
let mut response_millis = metadata.response_millis.load(Ordering::SeqCst);
let user_error_response = metadata.user_error_response.load(Ordering::Acquire);
let user_error_response = metadata.user_error_response.load(Ordering::SeqCst);
let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) {
let response_timestamp = match metadata.response_timestamp.load(Ordering::SeqCst) {
0 => {
// no response timestamp!
if !error_response {