diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 944768e7..be6981ad 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -12,6 +12,7 @@ use sentry::types::Dsn; use serde::{de, Deserialize, Deserializer}; use serde_inline_default::serde_inline_default; use std::fmt; +use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; @@ -392,21 +393,17 @@ impl From for AtomicU64 { #[serde_inline_default] #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] pub struct Web3RpcConfig { + /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs + #[serde(default = "Default::default")] + pub backup: bool, + /// block data limit. If None, will be queried + #[serde(default = "Default::default")] + pub block_data_limit: BlockDataLimit, /// simple way to disable a connection without deleting the row #[serde(default = "Default::default")] pub disabled: bool, /// a name used in /status and other user facing messages pub display_name: Option, - /// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks - pub ws_url: Option, - /// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON - pub http_url: Option, - /// block data limit. If None, will be queried - #[serde(default = "Default::default")] - pub block_data_limit: BlockDataLimit, - /// the requests per second at which the server starts slowing down - #[serde_inline_default(1u32)] - pub soft_limit: u32, /// the requests per period at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, /// the number of seconds in a rate limiting period @@ -416,13 +413,19 @@ pub struct Web3RpcConfig { /// if hard limits are applied per server or per endpoint. default is per server #[serde(default = "Default::default")] pub hard_limit_per_endpoint: bool, - /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs - #[serde(default = "Default::default")] - pub backup: bool, + /// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON + pub http_url: Option, + /// while not absolutely required, a ipc connection should be fastest + pub ipc_path: Option, + /// the requests per second at which the server starts slowing down + #[serde_inline_default(1u32)] + pub soft_limit: u32, /// Subscribe to the firehose of pending transactions /// Don't do this with free rpcs #[serde(default = "Default::default")] pub subscribe_txs: bool, + /// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks + pub ws_url: Option, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 400299c4..bda5c9e5 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -28,6 +28,7 @@ use std::borrow::Cow; use std::cmp::Reverse; use std::fmt; use std::hash::{Hash, Hasher}; +use std::path::PathBuf; use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::select; @@ -53,13 +54,16 @@ pub struct Web3Rpc { pub(super) block_map: Option, /// created_at is only inside an Option so that the "Default" derive works. it will always be set. pub(super) created_at: Option, - /// most all requests prefer use the http_provider + /// if no ipc_stream, most all requests prefer to use the http_provider pub(super) http_client: Option, pub(super) http_url: Option, /// the websocket url is only used for subscriptions pub(super) ws_url: Option, /// the websocket provider is only used for subscriptions pub(super) ws_provider: ArcSwapOption, + /// most all requests prefer the ipc provider. + /// TODO: ArcSwapOption? + pub(super) ipc_path: Option, /// keep track of hard limits /// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set. pub(super) hard_limit_until: Option>, @@ -227,6 +231,7 @@ impl Web3Rpc { head_block_sender: Some(head_block), http_url, http_client, + ipc_path: config.ipc_path, max_head_block_age, name, peak_latency: Some(peak_latency), diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d354cf30..89729ad2 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -3,7 +3,7 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, AuthorizationType}; use crate::globals::{global_db_conn, DB_CONN}; use crate::jsonrpc::{ - self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload, ValidatedRequest, + self, JsonRpcErrorData, JsonRpcResultData, ParsedResponse, ResponsePayload, ValidatedRequest, }; use anyhow::Context; use chrono::Utc; @@ -20,6 +20,8 @@ use serde_json::json; use std::pin::Pin; use std::sync::atomic; use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::net::UnixStream; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, trace, warn, Level}; @@ -199,10 +201,36 @@ impl OpenRequestHandle { async fn _request( &self, ) -> Web3ProxyResult> { - // TODO: replace ethers-rs providers with our own that supports streaming the responses - // TODO: replace ethers-rs providers with our own that handles "id" being null - if let (Some(url), Some(ref client)) = (self.rpc.http_url.clone(), &self.rpc.http_client) { - // prefer the http provider + if let Some(ipc_path) = self.rpc.ipc_path.as_ref() { + // first, prefer the unix stream + let request = self + .web3_request + .inner + .jsonrpc_request() + .context("there should always be a request here")?; + + // TODO: instead of connecting every time, use a connection pool + let mut ipc_stream = UnixStream::connect(ipc_path).await?; + + ipc_stream.writable().await?; + + let x = serde_json::to_vec(request)?; + + let _ = ipc_stream.write(&x).await?; + + ipc_stream.readable().await?; + + let mut buf = Vec::new(); + + let n = ipc_stream.try_read(&mut buf)?; + + let x: ParsedResponse = serde_json::from_slice(&buf[..n])?; + + Ok(x.into()) + } else if let (Some(url), Some(ref client)) = + (self.rpc.http_url.clone(), &self.rpc.http_client) + { + // second, prefer the http provider let request = self .web3_request .inner @@ -234,7 +262,7 @@ impl OpenRequestHandle { // cache 128kb responses jsonrpc::SingleResponse::read_if_short(response, 131_072, &self.web3_request).await } else if let Some(p) = self.rpc.ws_provider.load().as_ref() { - // use the websocket provider if no http provider is available + // use the websocket provider if no other provider is available let method = self.web3_request.inner.method(); let params = self.web3_request.inner.params();