This commit is contained in:
Bryan Stitt 2022-04-28 19:30:22 +00:00
parent 5c2e0dabc8
commit d1da66194e
4 changed files with 95 additions and 61 deletions

3
.gitignore vendored

@ -1 +1,4 @@
/target
/perf.data
/perf.data.old
/flamegraph.svg

@ -28,22 +28,31 @@ cargo run -r -- --eth-primary-rpc "https://your.favorite.provider"
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","params":[],"id":67}' 127.0.0.1:8845/eth
```
## Flame Graphs
$ cat /proc/sys/kernel/kptr_restrict
1
$ echo 0 |sudo tee /proc/sys/kernel/kptr_restrict
0
$ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph
## Load Testing
Test the proxy:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8445
Test geth:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
Test erigon:
wrk -s ./getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
## Todo

@ -25,6 +25,7 @@ static APP_USER_AGENT: &str = concat!(
);
/// The application
// TODO: this debug impl is way too verbose. make something smaller
#[derive(Debug)]
struct Web3ProxyApp {
/// clock used for rate limiting
@ -207,12 +208,15 @@ impl Web3ProxyApp {
// sleep (with a lock) until our rate limits should be available
drop(read_lock);
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
if let Some(not_until) = not_until {
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
let deadline = not_until.wait_time_from(self.clock.now());
drop(write_lock);
sleep(deadline).await;
drop(write_lock);
}
}
};
}
@ -260,25 +264,41 @@ impl Web3ProxyApp {
.await
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
if let Ok(partial_response) = response {
// TODO: trace
// info!("forwarding request from {}", upstream_server);
let response = match response {
Ok(partial_response) => {
// TODO: trace
// info!("forwarding request from {}", upstream_server);
let response = json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
});
return Ok(warp::reply::json(&response));
}
json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
})
}
Err(e) => {
// TODO: what is the proper format for an error?
// TODO: use e
json!({
"jsonrpc": "2.0",
"id": incoming_id,
"error": format!("{}", e)
})
}
};
return Ok(warp::reply::json(&response));
}
Err(not_until) => {
Err(None) => {
warn!("No servers in sync!");
}
Err(Some(not_until)) => {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
earliest_not_until.replace(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
@ -292,23 +312,26 @@ impl Web3ProxyApp {
// we haven't returned an Ok, sleep and try again
// TODO: move this to a helper function
drop(read_lock);
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
// unwrap should be safe since we would have returned if it wasn't set
let deadline = if let Some(earliest_not_until) = earliest_not_until {
earliest_not_until.wait_time_from(self.clock.now())
if let Some(earliest_not_until) = earliest_not_until {
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
let deadline = earliest_not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
drop(write_lock);
} else {
// TODO: exponential backoff?
Duration::from_secs(1)
// TODO: how long should we wait?
// TODO: max wait time?
sleep(Duration::from_millis(500)).await;
};
sleep(deadline).await;
drop(write_lock);
}
}
}
#[instrument]
async fn try_send_requests(
&self,
rpc_servers: Vec<String>,
@ -404,22 +427,22 @@ async fn main() {
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)],
// paid nodes
// TODO: add paid nodes (with rate limits)
vec![
// chainstack.com archive
(
"wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7",
0,
),
],
// vec![
// // chainstack.com archive
// (
// "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7",
// 0,
// ),
// ],
// free nodes
vec![
// ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline
("https://rpc.ankr.com/eth", 0),
],
// vec![
// // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline
// ("https://rpc.ankr.com/eth", 0),
// ],
],
vec![
("https://api.edennetwork.io/v1/beta", 0),
("https://api.edennetwork.io/v1/", 0),
// ("https://api.edennetwork.io/v1/beta", 0),
// ("https://api.edennetwork.io/v1/", 0),
],
)
.await
@ -433,9 +456,14 @@ async fn main() {
.then(move |json_body| state.clone().proxy_web3_rpc(json_body))
.map(handle_anyhow_errors);
warp::serve(proxy_rpc_filter)
.run(([0, 0, 0, 0], listen_port))
.await;
// TODO: filter for displaying connections
// TODO: filter for displaying
// TODO: warp trace is super verbose. how do we make this more readable?
// let routes = proxy_rpc_filter.with(warp::trace::request());
let routes = proxy_rpc_filter;
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
}
/// convert result into an http response. use this at the end of your warp filter

@ -161,7 +161,7 @@ impl Web3ProviderTier {
/// get the best available rpc server
#[instrument]
pub async fn next_upstream_server(&self) -> Result<String, NotUntil<QuantaInstant>> {
pub async fn next_upstream_server(&self) -> Result<String, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
for selected_rpc in self.synced_rpcs.load().iter() {
@ -203,16 +203,14 @@ impl Web3ProviderTier {
return Ok(selected_rpc.clone());
}
// return the smallest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
unimplemented!();
}
// this might be None
Err(earliest_not_until)
}
/// get all available rpc servers
pub async fn get_upstream_servers(&self) -> Result<Vec<String>, NotUntil<QuantaInstant>> {
pub async fn get_upstream_servers(
&self,
) -> Result<Vec<String>, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
let mut selected_rpcs = vec![];
for selected_rpc in self.synced_rpcs.load().iter() {
@ -253,11 +251,7 @@ impl Web3ProviderTier {
return Ok(selected_rpcs);
}
// return the earliest not_until
if let Some(not_until) = earliest_not_until {
Err(not_until)
} else {
Ok(vec![])
}
// return the earliest not_until (if no rpcs are synced, this will be None)
Err(earliest_not_until)
}
}