add ipc support

This commit is contained in:
Bryan Stitt 2023-12-05 10:54:12 -08:00
parent fa1e8d0aca
commit 979093c43d
3 changed files with 56 additions and 20 deletions

@ -12,6 +12,7 @@ use sentry::types::Dsn;
use serde::{de, Deserialize, Deserializer}; use serde::{de, Deserialize, Deserializer};
use serde_inline_default::serde_inline_default; use serde_inline_default::serde_inline_default;
use std::fmt; use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -392,21 +393,17 @@ impl From<BlockDataLimit> for AtomicU64 {
#[serde_inline_default] #[serde_inline_default]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct Web3RpcConfig { pub struct Web3RpcConfig {
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
#[serde(default = "Default::default")]
pub backup: bool,
/// block data limit. If None, will be queried
#[serde(default = "Default::default")]
pub block_data_limit: BlockDataLimit,
/// simple way to disable a connection without deleting the row /// simple way to disable a connection without deleting the row
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub disabled: bool, pub disabled: bool,
/// a name used in /status and other user facing messages /// a name used in /status and other user facing messages
pub display_name: Option<String>, pub display_name: Option<String>,
/// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks
pub ws_url: Option<String>,
/// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON
pub http_url: Option<String>,
/// block data limit. If None, will be queried
#[serde(default = "Default::default")]
pub block_data_limit: BlockDataLimit,
/// the requests per second at which the server starts slowing down
#[serde_inline_default(1u32)]
pub soft_limit: u32,
/// the requests per period at which the server throws errors (rate limit or otherwise) /// the requests per period at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>, pub hard_limit: Option<u64>,
/// the number of seconds in a rate limiting period /// the number of seconds in a rate limiting period
@ -416,13 +413,19 @@ pub struct Web3RpcConfig {
/// if hard limits are applied per server or per endpoint. default is per server /// if hard limits are applied per server or per endpoint. default is per server
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub hard_limit_per_endpoint: bool, pub hard_limit_per_endpoint: bool,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs /// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON
#[serde(default = "Default::default")] pub http_url: Option<String>,
pub backup: bool, /// while not absolutely required, a ipc connection should be fastest
pub ipc_path: Option<PathBuf>,
/// the requests per second at which the server starts slowing down
#[serde_inline_default(1u32)]
pub soft_limit: u32,
/// Subscribe to the firehose of pending transactions /// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs /// Don't do this with free rpcs
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub subscribe_txs: bool, pub subscribe_txs: bool,
/// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks
pub ws_url: Option<String>,
/// unknown config options get put here /// unknown config options get put here
#[serde(flatten, default = "HashMap::default")] #[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>, pub extra: HashMap<String, serde_json::Value>,

@ -28,6 +28,7 @@ use std::borrow::Cow;
use std::cmp::Reverse; use std::cmp::Reverse;
use std::fmt; use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::select; use tokio::select;
@ -53,13 +54,16 @@ pub struct Web3Rpc {
pub(super) block_map: Option<BlocksByHashCache>, pub(super) block_map: Option<BlocksByHashCache>,
/// created_at is only inside an Option so that the "Default" derive works. it will always be set. /// created_at is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) created_at: Option<Instant>, pub(super) created_at: Option<Instant>,
/// most all requests prefer use the http_provider /// if no ipc_stream, most all requests prefer to use the http_provider
pub(super) http_client: Option<reqwest::Client>, pub(super) http_client: Option<reqwest::Client>,
pub(super) http_url: Option<Url>, pub(super) http_url: Option<Url>,
/// the websocket url is only used for subscriptions /// the websocket url is only used for subscriptions
pub(super) ws_url: Option<Url>, pub(super) ws_url: Option<Url>,
/// the websocket provider is only used for subscriptions /// the websocket provider is only used for subscriptions
pub(super) ws_provider: ArcSwapOption<EthersWsProvider>, pub(super) ws_provider: ArcSwapOption<EthersWsProvider>,
/// most all requests prefer the ipc provider.
/// TODO: ArcSwapOption?
pub(super) ipc_path: Option<PathBuf>,
/// keep track of hard limits /// keep track of hard limits
/// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set. /// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) hard_limit_until: Option<watch::Sender<Instant>>, pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
@ -227,6 +231,7 @@ impl Web3Rpc {
head_block_sender: Some(head_block), head_block_sender: Some(head_block),
http_url, http_url,
http_client, http_client,
ipc_path: config.ipc_path,
max_head_block_age, max_head_block_age,
name, name,
peak_latency: Some(peak_latency), peak_latency: Some(peak_latency),

@ -3,7 +3,7 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType}; use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::globals::{global_db_conn, DB_CONN}; use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{ use crate::jsonrpc::{
self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload, ValidatedRequest, self, JsonRpcErrorData, JsonRpcResultData, ParsedResponse, ResponsePayload, ValidatedRequest,
}; };
use anyhow::Context; use anyhow::Context;
use chrono::Utc; use chrono::Utc;
@ -20,6 +20,8 @@ use serde_json::json;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic; use std::sync::atomic;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, trace, warn, Level}; use tracing::{debug, error, info, trace, warn, Level};
@ -199,10 +201,36 @@ impl OpenRequestHandle {
async fn _request<R: JsonRpcResultData + serde::Serialize>( async fn _request<R: JsonRpcResultData + serde::Serialize>(
&self, &self,
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> { ) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
// TODO: replace ethers-rs providers with our own that supports streaming the responses if let Some(ipc_path) = self.rpc.ipc_path.as_ref() {
// TODO: replace ethers-rs providers with our own that handles "id" being null // first, prefer the unix stream
if let (Some(url), Some(ref client)) = (self.rpc.http_url.clone(), &self.rpc.http_client) { let request = self
// prefer the http provider .web3_request
.inner
.jsonrpc_request()
.context("there should always be a request here")?;
// TODO: instead of connecting every time, use a connection pool
let mut ipc_stream = UnixStream::connect(ipc_path).await?;
ipc_stream.writable().await?;
let x = serde_json::to_vec(request)?;
let _ = ipc_stream.write(&x).await?;
ipc_stream.readable().await?;
let mut buf = Vec::new();
let n = ipc_stream.try_read(&mut buf)?;
let x: ParsedResponse<R> = serde_json::from_slice(&buf[..n])?;
Ok(x.into())
} else if let (Some(url), Some(ref client)) =
(self.rpc.http_url.clone(), &self.rpc.http_client)
{
// second, prefer the http provider
let request = self let request = self
.web3_request .web3_request
.inner .inner
@ -234,7 +262,7 @@ impl OpenRequestHandle {
// cache 128kb responses // cache 128kb responses
jsonrpc::SingleResponse::read_if_short(response, 131_072, &self.web3_request).await jsonrpc::SingleResponse::read_if_short(response, 131_072, &self.web3_request).await
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() { } else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
// use the websocket provider if no http provider is available // use the websocket provider if no other provider is available
let method = self.web3_request.inner.method(); let method = self.web3_request.inner.method();
let params = self.web3_request.inner.params(); let params = self.web3_request.inner.params();