change weight to tier

This commit is contained in:
Bryan Stitt 2023-01-03 22:37:51 -08:00
parent 5a567ebeea
commit c0fc999e02
5 changed files with 42 additions and 41 deletions

@ -293,6 +293,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] serde collect unknown fields in config instead of crash
- [x] upgrade user tier by address
- [x] all_backend_connections skips syncing servers
- [x] change weight back to tier
- [-] fix multiple origin and referer checks
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
@ -541,7 +542,6 @@ in another repo: event subscriber
- if someone subscribes to all pending transactions, how should that count against rate limits
- when those rate limits are hit, what should happen?
- missing pending transactions might be okay, but not missing confirmed blocks
- [ ] double check weight sorting code
- [ ] sea-orm brings in async-std, but we are using tokio. benchmark switching
- [ ] this query always times out, but erigon can serve it quickly: `curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"debug_traceBlockByNumber","params":["latest"],"id":1}' 127.0.0.1:8544' 127.0.0.1:8544`
{"jsonrpc":"2.0","id":null,"error":{"code":-32099,"message":"deadline has elapsed"}}

@ -315,7 +315,7 @@ mod tests {
block_data_limit: None,
soft_limit: 100,
hard_limit: None,
weight: 1,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
},
@ -329,7 +329,7 @@ mod tests {
block_data_limit: None,
soft_limit: 100,
hard_limit: None,
weight: 1,
tier: 0,
subscribe_txs: Some(false),
extra: Default::default(),
},

@ -198,9 +198,9 @@ pub struct Web3ConnectionConfig {
pub soft_limit: u32,
/// the requests per second at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>,
/// All else equal, a server with a lower weight receives more requests. Ranges 0-100
#[serde(default = "default_weight")]
pub weight: u32,
/// All else equal, a server with a lower tier receives all requests
#[serde(default = "default_tier")]
pub tier: u64,
/// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs
#[serde(default)]
@ -210,7 +210,7 @@ pub struct Web3ConnectionConfig {
pub extra: HashMap<String, serde_json::Value>,
}
fn default_weight() -> u32 {
fn default_tier() -> u64 {
0
}
@ -270,7 +270,7 @@ impl Web3ConnectionConfig {
block_sender,
tx_id_sender,
true,
self.weight,
self.tier,
open_request_handle_metrics,
)
.await

@ -85,8 +85,8 @@ pub struct Web3Connection {
pub(super) automatic_block_limit: bool,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// Lower weight are higher priority when sending requests. 0 to 99.
pub(super) weight: f64,
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: should this be an AsyncRwLock?
pub(super) head_block: RwLock<Option<SavedBlock>>,
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
@ -94,7 +94,7 @@ pub struct Web3Connection {
impl Web3Connection {
/// Connect to a web3 rpc
// TODO: have this take a builder (which will have channels attached)
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
name: String,
@ -114,7 +114,7 @@ impl Web3Connection {
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
weight: u32,
tier: u64,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
@ -128,9 +128,6 @@ impl Web3Connection {
)
});
// turn weight 0 into 100% and weight 100 into 0%
let weight = (100 - weight) as f64 / 100.0;
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into();
let automatic_block_limit =
@ -151,7 +148,7 @@ impl Web3Connection {
automatic_block_limit,
block_data_limit,
head_block: RwLock::new(Default::default()),
weight,
tier,
open_request_handle_metrics,
};
@ -1116,7 +1113,7 @@ impl Serialize for Web3Connection {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 8)?;
let mut state = serializer.serialize_struct("Web3Connection", 9)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?;
@ -1132,7 +1129,8 @@ impl Serialize for Web3Connection {
}
}
state.serialize_field("weight", &self.weight)?;
state.serialize_field("tier", &self.tier)?;
state.serialize_field("weight", &1.0)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
@ -1222,7 +1220,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
@ -1269,7 +1267,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};
@ -1320,7 +1318,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics),
};

@ -411,10 +411,9 @@ impl Web3Connections {
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
let usable_rpcs_by_head_num: BTreeMap<U64, Vec<Arc<Web3Connection>>> =
let usable_rpcs_by_head_num_and_weight: BTreeMap<(U64, u64), Vec<Arc<Web3Connection>>> =
if let Some(min_block_needed) = min_block_needed {
// need a potentially old block. check all the rpcs
// TODO: we are going to be checking "has_block_data" a lot now
let mut m = BTreeMap::new();
for x in self
@ -429,7 +428,9 @@ impl Web3Connections {
match x_head_block {
None => continue,
Some(x_head) => {
m.entry(x_head.number()).or_insert_with(Vec::new).push(x);
let key = (x_head.number(), u64::MAX - x.tier);
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
}
@ -437,7 +438,6 @@ impl Web3Connections {
m
} else {
// need latest. filter the synced rpcs
// TODO: double check has_block_data?
let synced_connections = self.synced_connections.load();
let head_block = match synced_connections.head_block.as_ref() {
@ -445,26 +445,32 @@ impl Web3Connections {
Some(x) => x,
};
// TODO: different allowed_lag depending on the chain
// TODO: self.allowed_lag instead of taking as an arg
if head_block.syncing(allowed_lag) {
return Ok(OpenRequestResult::NotReady);
}
let head_num = head_block.number();
let c: Vec<_> = synced_connections
let mut m = BTreeMap::new();
for x in synced_connections
.conns
.iter()
.filter(|x| !skip.contains(x))
.cloned()
.collect();
{
let key = (head_num, u64::MAX - x.tier);
BTreeMap::from([(head_num, c)])
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
m
};
let mut earliest_retry_at = None;
for usable_rpcs in usable_rpcs_by_head_num.into_values().rev() {
for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() {
// under heavy load, it is possible for even our best server to be negative
let mut minimum = f64::MAX;
// we sort on a combination of values. cache them here so that we don't do this math multiple times.
@ -475,24 +481,22 @@ impl Web3Connections {
// TODO: get active requests out of redis (that's definitely too slow)
// TODO: do something with hard limit instead? (but that is hitting redis too much)
let active_requests = rpc.active_requests() as f64;
let soft_limit = rpc.soft_limit as f64 * rpc.weight;
let soft_limit = rpc.soft_limit as f64;
// TODO: maybe store weight as the percentile
let available_requests = soft_limit - active_requests;
trace!("available requests on {}: {}", rpc, available_requests);
// under heavy load, it is possible for even our best server to be negative
minimum = available_requests.min(minimum);
// TODO: clone needed?
(rpc, available_requests)
})
.collect();
trace!("minimum available requests: {}", minimum);
// weights can't have negative numbers. shift up if any are negative
// choose_multiple_weighted can't have negative numbers. shift up if any are negative
// TODO: is this a correct way to shift?
if minimum < 0.0 {
available_request_map = available_request_map
.into_iter()
@ -943,7 +947,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: true,
block_data_limit: block_data_limit.into(),
weight: 100.0,
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
@ -962,7 +966,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: block_data_limit.into(),
weight: 100.0,
tier: 0,
head_block: RwLock::new(Some(lagged_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
@ -1165,7 +1169,7 @@ mod tests {
soft_limit: 3_000,
automatic_block_limit: false,
block_data_limit: 64.into(),
weight: 1.0,
tier: 1,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};
@ -1184,8 +1188,7 @@ mod tests {
soft_limit: 1_000,
automatic_block_limit: false,
block_data_limit: u64::MAX.into(),
// TODO: does weight = 0 work?
weight: 0.01,
tier: 2,
head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()),
};