add stub try_proxy_connection

This commit is contained in:
Bryan Stitt 2023-01-16 22:54:40 -08:00
parent b21b5699db
commit d7c75f843e
7 changed files with 426 additions and 79 deletions

@ -6,6 +6,7 @@ use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::FrontendErrorResponse;
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
@ -907,10 +908,10 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> Result<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Connection>>), FrontendErrorResponse>
{
// TODO: this should probably be trace level
// // trace!(?request, "proxy_web3_rpc");
// trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
@ -921,7 +922,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_request(&authorization, request),
self.proxy_cached_request(&authorization, request, proxy_mode),
)
.await??;
@ -930,7 +931,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_requests(&authorization, requests),
self.proxy_web3_rpc_requests(&authorization, requests, proxy_mode),
)
.await??;
@ -947,6 +948,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
proxy_mode: ProxyMode,
) -> anyhow::Result<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Connection>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -956,7 +958,7 @@ impl Web3ProxyApp {
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_web3_rpc_request(authorization, request))
.map(|request| self.proxy_cached_request(authorization, request, proxy_mode))
.collect::<Vec<_>>(),
)
.await;
@ -1000,10 +1002,11 @@ impl Web3ProxyApp {
}
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
async fn proxy_web3_rpc_request(
async fn proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
proxy_mode: ProxyMode,
) -> anyhow::Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Connection>>)> {
// trace!("Received request: {:?}", request);
@ -1172,22 +1175,32 @@ impl Web3ProxyApp {
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// TODO: how should we handle private_mode here?
let default_num = match proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best => Some(2),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Connections would be good for this
ProxyMode::Fastest(x) => Some(x * 2),
ProxyMode::Versus => None,
};
let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
if authorization.checks.private_txs {
// if we are sending the transaction privately, no matter the proxy_mode, we send to ALL private rpcs
(private_rpcs, None)
} else {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs would be good for this
(&self.balanced_rpcs, Some(2))
(&self.balanced_rpcs, default_num)
}
} else {
(&self.balanced_rpcs, Some(2))
(&self.balanced_rpcs, default_num)
};
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
.try_send_all_upstream_servers(
.try_send_all_synced_connections(
authorization,
&request,
Some(request_metadata.clone()),
@ -1298,7 +1311,8 @@ impl Web3ProxyApp {
json!(true)
}
"net_peerCount" => {
// emit stats
// no stats on this. its cheap
// TODO: do something with proxy_mode here?
self.balanced_rpcs.num_synced_rpcs().into()
}
"web3_clientVersion" => {
@ -1404,10 +1418,12 @@ impl Web3ProxyApp {
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
// TODO: put the hash here instead of the block number? its in the request already.
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
.try_proxy_connection(
proxy_mode,
self.allowed_lag,
&authorization,
request,
@ -1433,18 +1449,15 @@ impl Web3ProxyApp {
})?
} else {
self.balanced_rpcs
.try_send_best_upstream_server(
.try_proxy_connection(
proxy_mode,
self.allowed_lag,
&authorization,
request,
Some(&request_metadata),
None,
)
.await
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!("error while forwarding response: {}", err)
})?
.await?
}
};

@ -42,26 +42,98 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
// build our axum Router
let app = Router::new()
// routes should be ordered most to least common
// TODO: i think these routes could be done a lot better
//
// HTTP RPC (POST)
//
// public
.route("/", post(rpc_proxy_http::proxy_web3_rpc))
// authenticated with and without trailing slash
.route(
"/rpc/:rpc_key/",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
.route(
"/rpc/:rpc_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
// public fastest with and without trailing slash
.route("/fastest/", post(rpc_proxy_http::fastest_proxy_web3_rpc))
.route("/fastest", post(rpc_proxy_http::fastest_proxy_web3_rpc))
// authenticated fastest with and without trailing slash
.route(
"/fastest/:rpc_key/",
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key),
)
.route(
"/fastest/:rpc_key",
post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key),
)
// public versus
.route("/versus/", post(rpc_proxy_http::versus_proxy_web3_rpc))
.route("/versus", post(rpc_proxy_http::versus_proxy_web3_rpc))
// authenticated versus with and without trailing slash
.route(
"/versus/:rpc_key/",
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key),
)
.route(
"/versus/:rpc_key",
post(rpc_proxy_http::versus_proxy_web3_rpc_with_key),
)
//
// Websocket RPC (GET)
// If not an RPC, this will redirect to configurable urls
//
// public
.route("/", get(rpc_proxy_ws::websocket_handler))
.route(
"/rpc/:rpc_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
// authenticated with and without trailing slash
.route(
"/rpc/:rpc_key/",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route(
"/rpc/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
)
// public fastest with and without trailing slash
.route("/fastest/", get(rpc_proxy_ws::fastest_websocket_handler))
.route("/fastest", get(rpc_proxy_ws::fastest_websocket_handler))
// authenticated fastest with and without trailing slash
.route(
"/rpc/:rpc_key/",
get(rpc_proxy_ws::websocket_handler_with_key),
"/fastest/:rpc_key/",
get(rpc_proxy_ws::fastest_websocket_handler_with_key),
)
.route(
"/fastest/:rpc_key",
get(rpc_proxy_ws::fastest_websocket_handler_with_key),
)
// public versus
.route(
"/versus/",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
.route(
"/versus",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
// authenticated versus with and without trailing slash
.route(
"/versus/:rpc_key/",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
.route(
"/versus/:rpc_key",
get(rpc_proxy_ws::versus_websocket_handler_with_key),
)
//
// System things
//
.route("/health", get(status::health))
.route("/status", get(status::status))
//
// User stuff
//
.route("/user/login/:user_address", get(users::user_login_get))
.route(
"/user/login/:user_address/:message_eip",
@ -86,9 +158,11 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
)
.route("/user/stats/detailed", get(users::user_stats_detailed_get))
.route("/user/logout", post(users::user_logout_post))
.route("/status", get(status::status))
//
// Axum layers
// layers are ordered bottom up
// the last layer is first for requests and last for responses
//
// Mark the `Authorization` request header as sensitive so it doesn't show in logs
.layer(SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION)))
// handle cors

@ -2,6 +2,7 @@
use super::authorization::{ip_is_authorized, key_is_authorized};
use super::errors::FrontendResult;
use super::rpc_proxy_ws::ProxyMode;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
use axum::headers::{Origin, Referer, UserAgent};
@ -18,9 +19,41 @@ use std::sync::Arc;
#[debug_handler]
pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await
}
#[debug_handler]
pub async fn fastest_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
// TODO: read the fastest number from params
// TODO: check that the app allows this without authentication
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await
}
#[debug_handler]
pub async fn versus_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await
}
async fn _proxy_web3_rpc(
app: Arc<Web3ProxyApp>,
ClientIp(ip): ClientIp,
origin: Option<TypedHeader<Origin>>,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> FrontendResult {
// TODO: benchmark spawning this
// TODO: do we care about keeping the TypedHeader wrapper?
@ -31,7 +64,7 @@ pub async fn proxy_web3_rpc(
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.proxy_web3_rpc(authorization, payload, proxy_mode)
.await
.map(|(x, y)| (x, y, semaphore))?;
@ -58,12 +91,82 @@ pub async fn proxy_web3_rpc(
#[debug_handler]
pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc_with_key(
app,
ip,
origin,
referer,
user_agent,
rpc_key,
payload,
ProxyMode::Best,
)
.await
}
#[debug_handler]
pub async fn fastest_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc_with_key(
app,
ip,
origin,
referer,
user_agent,
rpc_key,
payload,
ProxyMode::Fastest(0),
)
.await
}
#[debug_handler]
pub async fn versus_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc_with_key(
app,
ip,
origin,
referer,
user_agent,
rpc_key,
payload,
ProxyMode::Versus,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn _proxy_web3_rpc_with_key(
app: Arc<Web3ProxyApp>,
ClientIp(ip): ClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
rpc_key: String,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> FrontendResult {
// TODO: DRY w/ proxy_web3_rpc
// the request can take a while, so we spawn so that we can start serving another request
@ -82,7 +185,7 @@ pub async fn proxy_web3_rpc_with_key(
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.proxy_web3_rpc(authorization, payload, proxy_mode)
.await
.map(|(x, y)| (x, y, semaphore))?;

@ -33,10 +33,58 @@ use serde_json::value::to_raw_value;
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
#[derive(Copy, Clone)]
pub enum ProxyMode {
/// send to the "best" synced server
Best,
/// send to all synced servers and return the fastest non-error response (reverts do not count as errors here)
Fastest(usize),
/// send to all servers for benchmarking. return the fastest non-error response
Versus,
}
/// Public entrypoint for WebSocket JSON-RPC requests.
/// Queries a single server at a time
#[debug_handler]
pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
_websocket_handler(ProxyMode::Fastest(1), app, ip, origin, ws_upgrade).await
}
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
/// Queries all synced backends with every request! This might get expensive!
#[debug_handler]
pub async fn fastest_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
// TODO: get the fastest number from the url params (default to 0/all)
// TODO: config to disable this
_websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await
}
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
/// Queries **all** backends with every request! This might get expensive!
#[debug_handler]
pub async fn versus_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
// TODO: config to disable this
_websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await
}
async fn _websocket_handler(
proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>,
ClientIp(ip): ClientIp,
origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>,
@ -49,7 +97,7 @@ pub async fn websocket_handler(
match ws_upgrade {
Some(ws) => Ok(ws
.on_upgrade(|socket| proxy_web3_socket(app, authorization, socket))
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))
.into_response()),
None => {
if let Some(redirect) = &app.config.redirect_public_url {
@ -72,12 +120,84 @@ pub async fn websocket_handler(
#[debug_handler]
pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
ip: ClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
// TODO: config instead of defaulting to fastest(1)?
_websocket_handler_with_key(
ProxyMode::Fastest(1),
app,
ip,
rpc_key,
origin,
referer,
user_agent,
ws_upgrade,
)
.await
}
#[debug_handler]
pub async fn fastest_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
// TODO: get the fastest number from the url params (default to 0/all)
_websocket_handler_with_key(
ProxyMode::Fastest(0),
app,
ip,
rpc_key,
origin,
referer,
user_agent,
ws_upgrade,
)
.await
}
#[debug_handler]
pub async fn versus_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: ClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
_websocket_handler_with_key(
ProxyMode::Versus,
app,
ip,
rpc_key,
origin,
referer,
user_agent,
ws_upgrade,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn _websocket_handler_with_key(
proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>,
ClientIp(ip): ClientIp,
rpc_key: String,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
let rpc_key = rpc_key.parse()?;
@ -96,9 +216,8 @@ pub async fn websocket_handler_with_key(
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws_upgrade) => {
Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket)))
}
Some(ws_upgrade) => Ok(ws_upgrade
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))),
None => {
// if no websocket upgrade, this is probably a user loading the url with their browser
@ -154,6 +273,7 @@ async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
socket: WebSocket,
proxy_mode: ProxyMode,
) {
// split the websocket so we can read and write concurrently
let (ws_tx, ws_rx) = socket.split();
@ -162,7 +282,13 @@ async fn proxy_web3_socket(
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
tokio::spawn(read_web3_socket(
app,
authorization,
ws_rx,
response_sender,
proxy_mode,
));
}
/// websockets support a few more methods than http clients
@ -173,6 +299,7 @@ async fn handle_socket_payload(
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
subscriptions: &mut HashMap<String, AbortHandle>,
proxy_mode: ProxyMode,
) -> Message {
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
@ -183,6 +310,7 @@ async fn handle_socket_payload(
[..]
{
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
.eth_subscribe(
authorization.clone(),
@ -247,7 +375,7 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.proxy_web3_rpc(authorization.clone(), json_request.into(), proxy_mode)
.await
.map_or_else(
|err| match err {
@ -291,6 +419,7 @@ async fn read_web3_socket(
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
proxy_mode: ProxyMode,
) {
let mut subscriptions = HashMap::new();
let subscription_count = AtomicUsize::new(1);
@ -307,6 +436,7 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
&mut subscriptions,
proxy_mode,
)
.await
}
@ -333,6 +463,7 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
&mut subscriptions,
proxy_mode,
)
.await
}

@ -165,7 +165,7 @@ impl Web3Connections {
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: don't hard code allowed lag
let response = self
.try_send_best_upstream_server(60, authorization, request, None, None)
.try_send_best_consensus_head_connection(60, authorization, request, None, None)
.await?;
let block = response.result.context("failed fetching block")?;
@ -241,7 +241,7 @@ impl Web3Connections {
// TODO: if error, retry?
// TODO: request_metadata or authorization?
let response = self
.try_send_best_upstream_server(60, authorization, request, None, Some(num))
.try_send_best_consensus_head_connection(60, authorization, request, None, Some(num))
.await?;
let raw_block = response.result.context("no block result")?;

@ -2,12 +2,13 @@
use super::blockchain::{ArcBlock, BlockHashesCache};
use super::connection::Web3Connection;
use super::request::{
OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestErrorHandler,
OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler,
};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
@ -406,8 +407,8 @@ impl Web3Connections {
unimplemented!("this shouldn't be possible")
}
/// get the best available rpc server
pub async fn best_synced_backend_connection(
/// get the best available rpc server with the consensus head block. it might have blocks after the consensus head
pub async fn best_consensus_head_connection(
&self,
allowed_lag: u64,
authorization: &Arc<Authorization>,
@ -662,7 +663,7 @@ impl Web3Connections {
/// be sure there is a timeout on this or it might loop forever
/// TODO: do not take allowed_lag here. have it be on the connections struct instead
pub async fn try_send_best_upstream_server(
pub async fn try_send_best_consensus_head_connection(
&self,
allowed_lag: u64,
authorization: &Arc<Authorization>,
@ -679,7 +680,7 @@ impl Web3Connections {
break;
}
match self
.best_synced_backend_connection(
.best_consensus_head_connection(
allowed_lag,
authorization,
request_metadata,
@ -705,7 +706,7 @@ impl Web3Connections {
.request(
&request.method,
&json!(request.params),
RequestErrorHandler::SaveReverts,
RequestRevertHandler::Save,
)
.await;
@ -818,7 +819,7 @@ impl Web3Connections {
}
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_all_upstream_servers(
pub async fn try_send_all_synced_connections(
&self,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
@ -887,6 +888,31 @@ impl Web3Connections {
}
}
}
pub async fn try_proxy_connection(
&self,
proxy_mode: ProxyMode,
allowed_lag: u64,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
match proxy_mode {
ProxyMode::Best => {
self.try_send_best_consensus_head_connection(
allowed_lag,
authorization,
request,
request_metadata,
min_block_needed,
)
.await
}
ProxyMode::Fastest(x) => todo!("Fastest"),
ProxyMode::Versus => todo!("Versus"),
}
}
}
impl fmt::Debug for Web3Connections {
@ -1088,7 +1114,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: don't hard code allowed_lag
let x = conns
.best_synced_backend_connection(60, &authorization, None, &[], None)
.best_consensus_head_connection(60, &authorization, None, &[], None)
.await
.unwrap();
@ -1143,21 +1169,21 @@ mod tests {
assert!(matches!(
conns
.best_synced_backend_connection(60, &authorization, None, &[], None)
.best_consensus_head_connection(60, &authorization, None, &[], None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_synced_backend_connection(60, &authorization, None, &[], Some(&0.into()))
.best_consensus_head_connection(60, &authorization, None, &[], Some(&0.into()))
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into()))
.await,
Ok(OpenRequestResult::Handle(_))
));
@ -1165,7 +1191,7 @@ mod tests {
// future block should not get a handle
assert!(matches!(
conns
.best_synced_backend_connection(60, &authorization, None, &[], Some(&2.into()))
.best_consensus_head_connection(60, &authorization, None, &[], Some(&2.into()))
.await,
Ok(OpenRequestResult::NotReady)
));
@ -1298,7 +1324,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
let best_head_server = conns
.best_synced_backend_connection(
.best_consensus_head_connection(
60,
&authorization,
None,
@ -1313,7 +1339,7 @@ mod tests {
));
let best_archive_server = conns
.best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into()))
.await;
match best_archive_server {

@ -42,7 +42,7 @@ pub struct OpenRequestHandle {
}
/// Depending on the context, RPC errors can require different handling.
pub enum RequestErrorHandler {
pub enum RequestRevertHandler {
/// Log at the trace level. Use when errors are expected.
TraceLevel,
/// Log at the debug level. Use when errors are expected.
@ -52,7 +52,7 @@ pub enum RequestErrorHandler {
/// Log at the warn level. Use when errors do not cause problems.
WarnLevel,
/// Potentially save the revert. Users can tune how often this happens
SaveReverts,
Save,
}
// TODO: second param could be skipped since we don't need it here
@ -65,13 +65,13 @@ struct EthCallFirstParams {
data: Option<Bytes>,
}
impl From<Level> for RequestErrorHandler {
impl From<Level> for RequestRevertHandler {
fn from(level: Level) -> Self {
match level {
Level::Trace => RequestErrorHandler::TraceLevel,
Level::Debug => RequestErrorHandler::DebugLevel,
Level::Error => RequestErrorHandler::ErrorLevel,
Level::Warn => RequestErrorHandler::WarnLevel,
Level::Trace => RequestRevertHandler::TraceLevel,
Level::Debug => RequestRevertHandler::DebugLevel,
Level::Error => RequestRevertHandler::ErrorLevel,
Level::Warn => RequestRevertHandler::WarnLevel,
_ => unimplemented!("unexpected tracing Level"),
}
}
@ -213,7 +213,7 @@ impl OpenRequestHandle {
&self,
method: &str,
params: &P,
error_handler: RequestErrorHandler,
revert_handler: RequestRevertHandler,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
@ -252,36 +252,36 @@ impl OpenRequestHandle {
if let Err(err) = &response {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
let error_handler = if let RequestErrorHandler::SaveReverts = error_handler {
let revert_handler = if let RequestRevertHandler::Save = revert_handler {
// TODO: should all these be Trace or Debug or a mix?
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else if self.authorization.db_conn.is_some() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
if log_revert_chance == 0.0 {
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else if log_revert_chance == 1.0 {
// trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler
revert_handler
} else if thread_fast_rng::thread_fast_rng().gen_range(0.0f64..=1.0)
< log_revert_chance
{
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
error_handler
revert_handler
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
RequestRevertHandler::TraceLevel
}
} else {
error_handler
revert_handler
};
// check for "execution reverted" here
@ -323,8 +323,8 @@ impl OpenRequestHandle {
}
// TODO: think more about the method and param logs. those can be sensitive information
match error_handler {
RequestErrorHandler::DebugLevel => {
match revert_handler {
RequestRevertHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if !is_revert {
debug!(
@ -333,7 +333,7 @@ impl OpenRequestHandle {
);
}
}
RequestErrorHandler::TraceLevel => {
RequestRevertHandler::TraceLevel => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,
@ -342,21 +342,21 @@ impl OpenRequestHandle {
err
);
}
RequestErrorHandler::ErrorLevel => {
RequestRevertHandler::ErrorLevel => {
// TODO: include params if not running in release mode
error!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
);
}
RequestErrorHandler::WarnLevel => {
RequestRevertHandler::WarnLevel => {
// TODO: include params if not running in release mode
warn!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
);
}
RequestErrorHandler::SaveReverts => {
RequestRevertHandler::Save => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,