change ordering and move fetch_add and fetch_sub

This commit is contained in:
Bryan Stitt 2023-05-12 23:00:03 -07:00
parent 914c3e03a8
commit c54970da0a
9 changed files with 65 additions and 45 deletions

View File

@ -6,7 +6,7 @@ Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or simi
Signed transactions (eth_sendRawTransaction) are sent in parallel to the configured private RPCs (eden, ethermine, flashbots, etc.).
All other requests are sent to an RPC server on the latest block (alchemy, moralis, rivet, your own node, or one of many other providers). If multiple servers are in sync, they are prioritized by `active_requests/soft_limit`. Note that this means that the fastest server is most likely to serve requests and slow servers are unlikely to ever get any requests.
All other requests are sent to an RPC server on the latest block (llamanodes, alchemy, moralis, rivet, your own node, or one of many other providers). If multiple servers are in sync, they are prioritized by `active_requests` and request latency. Note that this means that the fastest server is most likely to serve requests and slow servers are unlikely to ever get any requests.
Each server has different limits to configure. The `soft_limit` is the number of parallel active requests where a server starts to slow down. The `hard_limit` is where a server starts giving rate limits or other errors.

View File

@ -139,7 +139,7 @@ where
Ok(deferred_rate_limit_result)
} else {
// we have a cached amount here
let cached_key_count = local_key_count.fetch_add(count, Ordering::Acquire);
let cached_key_count = local_key_count.fetch_add(count, Ordering::AcqRel);
// assuming no other parallel futures incremented this key, this is the count that redis has
let expected_key_count = cached_key_count + count;

View File

@ -97,7 +97,7 @@ impl AtomicRttEstimate {
/// This method omits the ordering argument since loads may use
/// slightly stale data to avoid adding additional latency.
pub fn load(&self) -> RttEstimate {
RttEstimate::from_pair(self.pair.load(Ordering::Relaxed), self.start_time)
RttEstimate::from_pair(self.pair.load(Ordering::Acquire), self.start_time)
}
/// Fetches the value, and applies a function to it that returns an
@ -114,7 +114,7 @@ impl AtomicRttEstimate {
let mut update_at = Instant::now();
let mut rtt = Duration::ZERO;
self.pair
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |pair| {
.fetch_update(Ordering::Release, Ordering::Acquire, |pair| {
rtt = f(RttEstimate::from_pair(pair, self.start_time));
// Save the new update_at inside the function in case it
// is run multiple times

View File

@ -69,7 +69,7 @@ mod tests {
fn test_atomic_f32_pair_load() {
let pair = [f32::consts::PI, f32::consts::E];
let atomic = AtomicF32Pair::new(pair);
assert_eq!(pair, atomic.load(Ordering::Relaxed));
assert_eq!(pair, atomic.load(Ordering::Acquire));
}
#[test]
@ -77,13 +77,13 @@ mod tests {
let pair = [f32::consts::PI, f32::consts::E];
let atomic = AtomicF32Pair::new(pair);
atomic
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |[f1, f2]| {
.fetch_update(Ordering::Release, Ordering::Acquire, |[f1, f2]| {
Some([f1 + 1.0, f2 + 1.0])
})
.unwrap();
assert_eq!(
[pair[0] + 1.0, pair[1] + 1.0],
atomic.load(Ordering::Relaxed)
atomic.load(Ordering::Acquire)
);
}
}

View File

@ -1780,7 +1780,7 @@ impl Web3ProxyApp {
if block_depth < self.config.archive_depth {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
.store(true, atomic::Ordering::Release);
}
let request_block = self
@ -1810,7 +1810,7 @@ impl Web3ProxyApp {
if block_depth < self.config.archive_depth {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
.store(true, atomic::Ordering::Release);
}
let from_block = self

View File

@ -205,7 +205,7 @@ impl KafkaDebugLogger {
let payload =
rmp_serde::to_vec(&request).expect("requests should always serialize with rmp");
self.num_requests.fetch_add(1, atomic::Ordering::SeqCst);
self.num_requests.fetch_add(1, atomic::Ordering::AcqRel);
self.background_log(payload)
}
@ -217,7 +217,7 @@ impl KafkaDebugLogger {
let payload =
rmp_serde::to_vec(&response).expect("requests should always serialize with rmp");
self.num_responses.fetch_add(1, atomic::Ordering::SeqCst);
self.num_responses.fetch_add(1, atomic::Ordering::AcqRel);
self.background_log(payload)
}

View File

@ -620,7 +620,7 @@ impl Web3Rpcs {
}
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
match earliest_retry_at {
@ -929,7 +929,7 @@ impl Web3Rpcs {
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
tokio::select! {
@ -943,7 +943,7 @@ impl Web3Rpcs {
}
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
let waiting_for = min_block_needed.max(max_block_needed);
@ -1086,7 +1086,7 @@ impl Web3Rpcs {
if let Some(request_metadata) = &request_metadata {
// TODO: if this times out, i think we drop this
request_metadata.no_servers.fetch_add(1, Ordering::Release);
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
watch_consensus_rpcs.changed().await?;
@ -1101,7 +1101,7 @@ impl Web3Rpcs {
warn!("All rate limits exceeded. Sleeping");
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
tokio::select! {

View File

@ -239,7 +239,7 @@ impl Web3Rpc {
let peak_latency = self.peak_latency.as_ref().unwrap().latency().as_secs_f64();
// TODO: what ordering?
let active_requests = self.active_requests.load(atomic::Ordering::Relaxed) as f64 + 1.0;
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0;
OrderedFloat(peak_latency * active_requests)
}
@ -734,7 +734,7 @@ impl Web3Rpc {
// health check as a way of keeping this rpc's request_ewma accurate
// TODO: do something different if this is a backup server?
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
new_total_requests = rpc.total_requests.load(atomic::Ordering::Acquire);
// TODO: how many requests should we require in order to skip a health check?
if new_total_requests - old_total_requests < 10 {
@ -1363,7 +1363,7 @@ impl Serialize for Web3Rpc {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Rpc", 9)?;
let mut state = serializer.serialize_struct("Web3Rpc", 12)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?;
@ -1372,7 +1372,7 @@ impl Serialize for Web3Rpc {
state.serialize_field("backup", &self.backup)?;
match self.block_data_limit.load(atomic::Ordering::Relaxed) {
match self.block_data_limit.load(atomic::Ordering::Acquire) {
u64::MAX => {
state.serialize_field("block_data_limit", &None::<()>)?;
}
@ -1395,9 +1395,21 @@ impl Serialize for Web3Rpc {
state.serialize_field("head_latency", &self.head_latency.read().value())?;
state.serialize_field(
"peak_latency",
&self.peak_latency.as_ref().unwrap().latency(),
)?;
state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Acquire),
)?;
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Relaxed),
&self.total_requests.load(atomic::Ordering::Acquire),
)?;
state.end()
@ -1410,7 +1422,7 @@ impl fmt::Debug for Web3Rpc {
f.field("name", &self.name);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire);
if block_data_limit == u64::MAX {
f.field("blocks", &"all");
} else {

View File

@ -7,7 +7,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
use ethers::types::{Address, Bytes};
use log::{debug, error, trace, warn, Level};
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
@ -121,14 +121,22 @@ impl Authorization {
}
}
impl Drop for OpenRequestHandle {
fn drop(&mut self) {
let x = self
.rpc
.active_requests
.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, rpc: Arc<Web3Rpc>) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
// TODO: should we be using metered, or not? i think not because we want stats for each handle
// TODO: these should maybe be sent to an influxdb instance?
rpc.active_requests.fetch_add(1, atomic::Ordering::Relaxed);
rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
Self { authorization, rpc }
}
@ -188,11 +196,9 @@ impl OpenRequestHandle {
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
self.rpc
.active_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions)
let start = Instant::now();
@ -212,16 +218,16 @@ impl OpenRequestHandle {
};
// note. we intentionally do not record this latency now. we do NOT want to measure errors
// let latency = latency.elapsed();
let latency = start.elapsed();
self.rpc
.active_requests
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called!
// TODO: i think ethers already has trace logging (and does it much more fancy)
trace!(
"response from {} for {} {:?}: {:?}",
self.rpc, method, params, response,
self.rpc,
method,
params,
response,
);
if let Err(err) = &response {
@ -352,19 +358,21 @@ impl OpenRequestHandle {
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method = Method::try_from_value(&method.to_string()).unwrap();
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
let params: EthCallParams = serde_json::from_value(json!(params))
.context("parsing params to EthCallParams")
.unwrap();
match serde_json::from_value::<EthCallParams>(json!(params)) {
Ok(params) => {
// spawn saving to the database so we don't slow down the request
let f = self.authorization.clone().save_revert(method, params.0 .0);
// spawn saving to the database so we don't slow down the request
let f = self.authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
tokio::spawn(f);
}
Err(err) => {
warn!("failed parsing eth_call params. unable to save revert");
}
}
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
peak_latency.report(start.elapsed());
peak_latency.report(latency);
} else {
unreachable!("peak_latency not initialized");
}