set overall max inside the lock

This commit is contained in:
Bryan Stitt 2022-05-06 20:44:12 +00:00
parent ef86625fdb
commit 8548753a32
5 changed files with 108 additions and 88 deletions

34
TODO.md
View File

@ -1,32 +1,8 @@
# Todo
- [ ] the ethermine rpc is usually fastest. but its in the private tier. since we only allow synced rpcs, we are going to not have an rpc a lot of the time
- [ ] if not backends. return a 502 instead of delaying?
- [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high
- [ ] thundering herd problem if we only allow a lag of 1 block. soft rate limits should help
# notes
its almost working. when i curl it, it doesn't work exactly right though
## first time:
```
thread 'tokio-runtime-worker' panicked at 'not implemented', src/provider_tiers.rs:142:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
```
I think this is not seeing any as in sync. not sure why else it would not have any not_until set.
I believe this is because we don't know the first block. we should force an update or something at the start
## second time:
"false"
it loses all the "jsonrpc" parts and just has the simple result. need to return a proper jsonrpc response
# TODO: add the backend server to the header
# random thoughts:
the web3proxyapp object gets cloned for every call. why do we need any arcs inside that? shouldn't they be able to connect to the app's?
on friday i had it over 100k rps. but now, even when i roll back to that commit, i can't get it that high. what changed?
i think we need a top level head block. otherwise if tier0 stalls, we will keep using it
- [ ] thundering herd problem if we only allow a lag of 0 blocks. i don't see any solution besides allowing a one or two block lag
- [ ] add the backend server to the header?
- [ ] the web3proxyapp object gets cloned for every call. why do we need any arcs inside that? shouldn't they be able to connect to the app's?

View File

@ -6,6 +6,9 @@
url = "ws://127.0.0.1:8546"
soft_limit = 200_000
[balanced_rpc_tiers.0.ankr]
url = "https://rpc.ankr.com/eth"
soft_limit = 3_000
[private_rpcs]

View File

@ -130,6 +130,11 @@ impl Web3Connection {
self.head_block_number.load(atomic::Ordering::Acquire)
}
#[inline]
pub fn soft_limit(&self) -> u32 {
self.soft_limit
}
#[inline]
pub fn url(&self) -> &str {
&self.url
@ -140,7 +145,6 @@ impl Web3Connection {
pub async fn new_heads(
self: Arc<Self>,
connections: Option<Arc<Web3Connections>>,
best_head_block_number: Arc<AtomicU64>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
@ -170,18 +174,10 @@ impl Web3Connection {
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
info!("new block on {}: {}", self, block_number);
// we don't care about this result.
let _ = best_head_block_number.compare_exchange(
old_block_number,
block_number,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;
} else {
info!("new block on {}: {}", self, block_number);
}
}
}
@ -206,37 +202,24 @@ impl Web3Connection {
drop(active_request_handle);
info!("current block on {}: {}", self, block_number);
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::Release);
// we don't care about this result
let _ = best_head_block_number.compare_exchange(
old_block_number,
block_number,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
);
// TODO: swap and check the result?
self.head_block_number
.store(block_number, atomic::Ordering::Release);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;
} else {
info!("new head block from {}: {}", self, block_number);
}
while let Some(block) = stream.next().await {
let block_number = block.number.unwrap().as_u64();
while let Some(new_block) = stream.next().await {
let new_block_number = new_block.number.unwrap().as_u64();
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
// TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run
self.head_block_number
.store(block_number, atomic::Ordering::Release);
// TODO: what ordering?
best_head_block_number.fetch_max(block_number, atomic::Ordering::AcqRel);
info!("new block on {}: {}", self, block_number);
.fetch_max(new_block_number, atomic::Ordering::AcqRel);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;

View File

@ -11,7 +11,7 @@ use std::cmp;
use std::fmt;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use tracing::warn;
use tracing::{debug, info, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
@ -90,15 +90,11 @@ impl Web3Connections {
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
let connections = connections.clone();
let best_head_block_number = best_head_block_number.clone();
tokio::spawn(async move {
let url = connection.url().to_string();
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection
.new_heads(Some(connections), best_head_block_number)
.await
{
if let Err(e) = connection.new_heads(Some(connections)).await {
warn!("new_heads error on {}: {:?}", url, e);
}
});
@ -202,10 +198,30 @@ impl Web3Connections {
new_block.cmp(&current_best_block_number),
) {
(cmp::Ordering::Greater, cmp::Ordering::Greater) => {
// this newest block is the new overall best block
// the rpc's newest block is the new overall best block
synced_connections.inner.clear();
synced_connections.head_block_number = new_block;
// TODO: what ordering?
match self.best_head_block_number.compare_exchange(
overall_best_head_block,
new_block,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
) {
Ok(current_best_block_number) => {
info!("new head block from {}: {}", rpc, current_best_block_number);
}
Err(current_best_block_number) => {
// actually, there was a race and this ended up not being the latest block. return now without adding this rpc to the synced list
debug!(
"behind {} on {:?}: {}",
current_best_block_number, rpc, new_block
);
return Ok(());
}
}
}
(cmp::Ordering::Equal, cmp::Ordering::Less) => {
// no need to do anything
@ -242,7 +258,9 @@ impl Web3Connections {
synced_connections.head_block_number = new_block;
}
(cmp::Ordering::Greater, cmp::Ordering::Equal) => {
panic!("Greater+Equal should be impossible")
// TODO: what should we do? i think we got here because we aren't using atomics properly
// the overall block hasn't yet updated, but our internal block has
// TODO: maybe we should
}
}
@ -252,8 +270,11 @@ impl Web3Connections {
.position(|x| x.url() == rpc.url())
.unwrap();
// TODO: hopefully nothing ends up in here twice. Greater+Equal might do that to us
synced_connections.inner.push(rpc_index);
trace!("Now synced {:?}: {:?}", self, synced_connections.inner);
Ok(())
}
@ -266,25 +287,39 @@ impl Web3Connections {
// TODO: this clone is probably not the best way to do this
let mut synced_rpc_indexes = self.synced_connections.read().inner.clone();
let cache: HashMap<usize, u32> = synced_rpc_indexes
// // TODO: how should we include the soft limit? floats are slower than integer math
// let a = a as f32 / self.soft_limit as f32;
// let b = b as f32 / other.soft_limit as f32;
let sort_cache: HashMap<usize, (f32, u32)> = synced_rpc_indexes
.iter()
.map(|synced_index| {
(
*synced_index,
self.inner.get(*synced_index).unwrap().active_requests(),
)
let key = *synced_index;
let connection = self.inner.get(*synced_index).unwrap();
let active_requests = connection.active_requests();
let soft_limit = connection.soft_limit();
let utilization = active_requests as f32 / soft_limit as f32;
(key, (utilization, soft_limit))
})
.collect();
// TODO: i think we might need to load active connections and then
synced_rpc_indexes.sort_unstable_by(|a, b| {
let a = cache.get(a).unwrap();
let b = cache.get(b).unwrap();
let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap();
// TODO: don't just sort by active requests. sort by active requests as a percentage of soft limit
// TODO: if those are equal, sort on soft limit
a.cmp(b)
// TODO: i'm comparing floats. crap
match a_utilization
.partial_cmp(b_utilization)
.unwrap_or(cmp::Ordering::Equal)
{
cmp::Ordering::Equal => a_soft_limit.cmp(b_soft_limit),
x => x,
}
});
for selected_rpc in synced_rpc_indexes.into_iter() {
@ -295,10 +330,16 @@ impl Web3Connections {
Err(not_until) => {
earliest_possible(&mut earliest_not_until, not_until);
}
Ok(handle) => return Ok(handle),
Ok(handle) => {
trace!("next server on {:?}: {:?}", self, selected_rpc);
return Ok(handle);
}
}
}
// TODO: this is too verbose
// warn!("no servers on {:?}! {:?}", self, earliest_not_until);
// this might be None
Err(earliest_not_until)
}

View File

@ -175,17 +175,32 @@ impl Web3ProxyApp {
} else {
// this is not a private transaction (or no private relays are configured)
// try to send to each tier, stopping at the first success
// if no tiers are synced, fallback to privates
loop {
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
let mut earliest_not_until = None;
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
let current_block = balanced_rpcs.head_block_number(); // TODO: we don't store current block for everything anymore. we store it on the connections
// TODO: how can we better build this iterator?
let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
self.balanced_rpc_tiers.iter().chain(vec![private_rpcs])
} else {
self.balanced_rpc_tiers.iter().chain(vec![])
};
for balanced_rpcs in rpc_iter {
let best_head_block_number =
self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections
let best_rpc_block_number = balanced_rpcs.head_block_number();
if best_rpc_block_number < best_head_block_number {
continue;
}
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
current_block,
best_head_block_number,
json_body.method.clone(),
json_body.params.to_string(),
);
@ -212,7 +227,6 @@ impl Web3ProxyApp {
// info!("forwarding request from {}", upstream_server);
let response = JsonRpcForwardedResponse {
// TODO: re-use their jsonrpc?
jsonrpc: "2.0".to_string(),
id: json_body.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
@ -245,7 +259,7 @@ impl Web3ProxyApp {
return Ok(warp::reply::json(&response));
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, use those!
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
// warn!("No servers in sync!");
}
Err(Some(not_until)) => {
@ -267,7 +281,8 @@ impl Web3ProxyApp {
}
}
// we haven't returned an Ok, sleep and try again
// we haven't returned an Ok
// if we did return a rate limit error, sleep and try again
if let Some(earliest_not_until) = earliest_not_until {
let deadline = earliest_not_until.wait_time_from(self.clock.now());
@ -275,7 +290,9 @@ impl Web3ProxyApp {
} else {
// TODO: how long should we wait?
// TODO: max wait time?
sleep(Duration::from_millis(500)).await;
warn!("No servers in sync!");
// TODO: return json error? return a 502?
return Err(anyhow::anyhow!("no servers in sync"));
};
}
}