This commit is contained in:
Bryan Stitt 2022-07-02 01:20:28 +00:00
parent f36ca5e702
commit 8fe571ae68
4 changed files with 54 additions and 21 deletions

14
TODO.md

@ -30,18 +30,17 @@
- even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens.
- running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding
- fixed by https://github.com/gakonst/ethers-rs/pull/1287
- [ ] when sending with private relays, brownie's tx.wait can think the transaction was dropped. smarter retry on eth_getTransactionByHash and eth_getTransactionReceipt (maybe only if we sent the transaction ourselves)
- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more
- [ ] if web3 proxy gets an http error back, retry another node
- [ ] endpoint for health checks. if no synced servers, give a 502 error
- [ ] interval for http subscriptions should be based on block time.
- [ ] todo: include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load
- [x] when sending with private relays, brownie's tx.wait can think the transaction was dropped. smarter retry on eth_getTransactionByHash and eth_getTransactionReceipt (maybe only if we sent the transaction ourselves)
- [x] if web3 proxy gets an http error back, retry another node
- [x] endpoint for health checks. if no synced servers, give a 502 error
- [ ] refactor so configs can change while running
- create the app without applying any config to it
- have a blocking future watching the config file and calling app.apply_config() on first load and on change
- [ ] rpc errors propagate too far. one subscription failing ends the app. isolate the providers more (might already be fixed)
## V1
- [ ] interval for http subscriptions should be based on block time. load from config is easy, but
- [ ] some things that are cached locally should probably be in shared redis caches
- [ ] stats when forks are resolved (and what chain they were on?)
- [ ] incoming rate limiting (by api key)
@ -84,4 +83,5 @@
- [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes?
- [ ] subscription id should be per connection, not global
- [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master)
- [ ] subscribe to pending transactions and build an intelligent gas estimator
- [ ] subscribe to pending transactions and build an intelligent gas estimator
- [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load

@ -566,6 +566,10 @@ impl ActiveRequestHandle {
Self(connection)
}
pub fn clone_connection(&self) -> Arc<Web3Connection> {
self.0.clone()
}
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete

@ -550,7 +550,7 @@ impl Web3Connections {
if synced_connections.inner.len() == total_rpcs {
// TODO: more metrics
debug!("all head: {}", new_block_hash);
trace!("all head: {}", new_block_hash);
}
trace!(
@ -576,7 +576,6 @@ impl Web3Connections {
}
// TODO: only publish if there are x (default 2) nodes synced to this block?
// do the arcswap
// TODO: do this before or after processing all the transactions in this block?
self.synced_connections.swap(synced_connections);
}
@ -589,7 +588,10 @@ impl Web3Connections {
/// get the best available rpc server
#[instrument(skip_all)]
pub async fn next_upstream_server(&self) -> Result<ActiveRequestHandle, Option<Duration>> {
pub async fn next_upstream_server(
&self,
skip: &[Arc<Web3Connection>],
) -> Result<ActiveRequestHandle, Option<Duration>> {
let mut earliest_retry_after = None;
let mut synced_rpcs: Vec<Arc<Web3Connection>> = self
@ -597,9 +599,14 @@ impl Web3Connections {
.load()
.inner
.iter()
.filter(|x| !skip.contains(x))
.cloned()
.collect();
if synced_rpcs.is_empty() {
return Err(None);
}
let sort_cache: HashMap<Arc<Web3Connection>, (f32, u32)> = synced_rpcs
.iter()
.map(|rpc| {
@ -657,8 +664,8 @@ impl Web3Connections {
// check rate limits and increment our connection counter
match connection.try_request_handle().await {
Err(retry_after) => {
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
// this rpc is not available. skip it
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
}
Ok(handle) => selected_rpcs.push(handle),
}
@ -677,23 +684,44 @@ impl Web3Connections {
&self,
request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
// TODO: maximum retries?
loop {
match self.next_upstream_server().await {
if skip_rpcs.len() == self.inner.len() {
break;
}
match self.next_upstream_server(&skip_rpcs).await {
Ok(active_request_handle) => {
// save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection());
let response_result = active_request_handle
.request(&request.method, &request.params)
.await;
match JsonRpcForwardedResponse::from_response_result(
match JsonRpcForwardedResponse::try_from_response_result(
response_result,
request.id.clone(),
) {
Ok(response) => {
if response.error.is_some() {
trace!(?response, "Sending error reply",);
// errors already sent false to the in_flight_tx
if let Some(error) = &response.error {
trace!(?response, "rpc error");
// some errors should be retried
if error.code == -32000
&& [
"header not found",
"header for hash not found",
"node not started",
"RPC timeout",
]
.contains(&error.message.as_str())
{
continue;
}
} else {
trace!(?response, "Sending reply");
trace!(?response, "rpc success");
}
return Ok(response);
@ -714,8 +742,7 @@ impl Web3Connections {
Err(None) => {
warn!(?self, "No servers in sync!");
// TODO: sleep how long? until synced_connections changes or rate limits are available
// TODO: subscribe to head_block_sender
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep(Duration::from_millis(200)).await;
continue;
@ -733,13 +760,15 @@ impl Web3Connections {
}
}
}
Err(anyhow::anyhow!("all retries exhausted"))
}
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_all_upstream_servers(
&self,
request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
// TODO: timeout on this loop
loop {
match self.get_upstream_servers().await {
Ok(active_request_handles) => {

@ -181,7 +181,7 @@ impl JsonRpcForwardedResponse {
}
}
pub fn from_response_result(
pub fn try_from_response_result(
result: Result<Box<RawValue>, ProviderError>,
id: Box<RawValue>,
) -> anyhow::Result<Self> {