silence some warnings and make private rpcs an option

This commit is contained in:
Bryan Stitt 2022-09-14 04:27:18 +00:00
parent 1d4d5844d0
commit a5ccec76c7
6 changed files with 58 additions and 54 deletions

View File

@ -74,7 +74,7 @@ pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Arc<Web3Connections>,
pub private_rpcs: Option<Arc<Web3Connections>>,
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
@ -280,7 +280,7 @@ impl Web3ProxyApp {
let private_rpcs = if private_rpcs.is_empty() {
// TODO: do None instead of clone?
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
balanced_rpcs.clone()
None
} else {
// TODO: attach context to this error
let (private_rpcs, private_handle) = Web3Connections::spawn(
@ -304,7 +304,7 @@ impl Web3ProxyApp {
handles.push(private_handle);
private_rpcs
Some(private_rpcs)
};
let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| {
@ -762,8 +762,9 @@ impl Web3ProxyApp {
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// emit stats
return self
.private_rpcs
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
.try_send_all_upstream_servers(request, None)
.instrument(span)
.await;
@ -849,27 +850,11 @@ impl Web3ProxyApp {
let mut response = self
.response_cache
.try_get_with(cache_key, async move {
match method {
"temporarily disabled" => {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self.private_rpcs
.try_send_all_upstream_servers(
request,
Some(&request_block_id.num),
)
.await
}
_ => {
// TODO: retry some failures automatically!
self.balanced_rpcs
.try_send_best_upstream_server(
request,
Some(&request_block_id.num),
)
.await
}
}
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
self.balanced_rpcs
.try_send_best_upstream_server(request, Some(&request_block_id.num))
.await
})
.await
.unwrap();

View File

@ -120,7 +120,7 @@ impl Web3Connections {
Some(rpc) => {
rpc.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", get_block_params)
.request("eth_getBlockByHash", get_block_params, false)
.await?
}
None => {
@ -140,7 +140,8 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByHash, so it should have all fields
self.save_block(&block, true).await?;
// TODO: fill in heaviest_chain!
self.save_block(&block, false).await?;
Ok(block)
}
@ -256,6 +257,7 @@ impl Web3Connections {
None
} else {
// we don't know if its on the heaviest chain yet
debug!(?rpc_head_hash, ?rpc_head_num, %rpc.name, "saving");
self.save_block(&rpc_head_block, false).await?;
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
@ -290,7 +292,7 @@ impl Web3Connections {
let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) {
x
} else {
// TODO: why does this happen?
// TODO: why does this happen?!?! maybe we should do get_with?
warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes");
continue;
};
@ -401,6 +403,8 @@ impl Web3Connections {
// i think "conns" is somehow getting dupes
trace!(?heavy_rpcs);
// TODO: if maybe_head_block.time() is old, ignore it
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs
.into_iter()

View File

@ -114,7 +114,7 @@ impl Web3Connection {
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle()
.await?
.request("eth_chainId", Option::None::<()>)
.request("eth_chainId", Option::None::<()>, false)
.await;
match found_chain_id {
@ -206,6 +206,7 @@ impl Web3Connection {
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
),
true,
)
.await;
@ -295,31 +296,40 @@ impl Web3Connection {
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
// websocket doesn't need the http client
block_sender: Option<&flume::Sender<BlockAndRpc>>,
) -> anyhow::Result<()> {
// TODO: no-op if this called on a http provider
// websocket doesn't need the http client
info!(rpc=%self, "connecting");
// since this lock is held open over an await, we use tokio's locking
// TODO: timeout on this lock. if its slow, something is wrong
let mut provider = self.provider.write().await;
// our provider doesn't work anymore
*provider = None;
if provider.is_some() {
if self.http_client.is_some() {
// http clients don't need to do anything for reconnecting
// they *do* need to run this function to setup the first
return Ok(());
}
// reset sync status
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
info!(rpc=%self, "reconnecting");
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = &block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
// disconnect the current provider
*provider = None;
// reset sync status
{
let mut head_block_id = self.head_block_id.write();
*head_block_id = None;
}
// tell the block subscriber that we don't have any blocks
if let Some(block_sender) = &block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
} else {
info!(rpc=%self, "connecting");
}
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
@ -381,7 +391,7 @@ impl Web3Connection {
let complete_head_block: Block<TxHash> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false))
.request("eth_getBlockByHash", (new_hash, false), false)
.await?;
debug_assert!(complete_head_block.total_difficulty.is_some());
@ -549,7 +559,7 @@ impl Web3Connection {
match self.wait_for_request_handle().await {
Ok(active_request_handle) => {
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.request("eth_getBlockByNumber", ("latest", false), false)
.await;
match block {
@ -619,7 +629,7 @@ impl Web3Connection {
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByNumber", ("latest", false))
.request("eth_getBlockByNumber", ("latest", false), false)
.await
.map(|x| Some(Arc::new(x)));

View File

@ -305,7 +305,7 @@ impl Web3Connections {
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> =
active_request_handle.request(method, params).await;
active_request_handle.request(method, params, false).await;
result
})
.collect::<FuturesUnordered<_>>()
@ -500,7 +500,7 @@ impl Web3Connections {
skip_rpcs.push(active_request_handle.clone_connection());
let response_result = active_request_handle
.request(&request.method, &request.params)
.request(&request.method, &request.params, false)
.await;
match JsonRpcForwardedResponse::try_from_response_result(

View File

@ -64,12 +64,14 @@ impl OpenRequestHandle {
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete.
/// TODO: we no longer take self because metered doesn't like that
/// TODO: ErrorCount includes too many types of errors, such as transaction reverts
#[instrument(skip_all)]
#[measure([ErrorCount, HitCount, InFlight, ResponseTime, Throughput])]
pub async fn request<T, R>(
&self,
method: &str,
params: T,
silent_errors: bool,
) -> Result<R, ethers::prelude::ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
@ -107,7 +109,10 @@ impl OpenRequestHandle {
// TODO: i think ethers already has trace logging (and does it much more fancy)
if let Err(err) = &response {
warn!(?err, %method, rpc=%self.conn, "bad response!");
if !silent_errors {
// TODO: this isn't always bad. missing trie node while we are checking initial
warn!(?err, %method, rpc=%self.conn, "bad response!");
}
} else {
// TODO: opt-in response inspection to log reverts with their request. put into redis or what?
// trace!(rpc=%self.0, %method, ?response);

View File

@ -27,7 +27,7 @@ impl Web3Connections {
let tx: Transaction = match rpc.try_request_handle().await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.request("eth_getTransactionByHash", (pending_tx_id,), false)
.await?
}
Ok(_) => {