2022-10-18 00:47:58 +03:00
|
|
|
//! Take a user's WebSocket JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server.
|
|
|
|
//!
|
|
|
|
//! WebSockets are the preferred method of receiving requests, but not all clients have good support.
|
|
|
|
|
2022-11-20 01:05:51 +03:00
|
|
|
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata};
|
2023-05-31 07:26:11 +03:00
|
|
|
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
|
2023-04-11 08:29:02 +03:00
|
|
|
use crate::jsonrpc::JsonRpcId;
|
2022-11-20 01:05:51 +03:00
|
|
|
use crate::{
|
|
|
|
app::Web3ProxyApp,
|
2023-05-31 07:26:11 +03:00
|
|
|
errors::Web3ProxyResult,
|
2022-11-20 01:05:51 +03:00
|
|
|
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
|
|
|
|
};
|
2023-05-13 01:15:32 +03:00
|
|
|
use anyhow::Context;
|
2022-10-20 23:26:14 +03:00
|
|
|
use axum::headers::{Origin, Referer, UserAgent};
|
2022-05-20 08:30:54 +03:00
|
|
|
use axum::{
|
2022-05-29 20:28:41 +03:00
|
|
|
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
2022-08-06 04:17:25 +03:00
|
|
|
extract::Path,
|
2022-08-21 12:44:53 +03:00
|
|
|
response::{IntoResponse, Redirect},
|
2022-09-22 22:57:21 +03:00
|
|
|
Extension, TypedHeader,
|
2022-05-20 08:30:54 +03:00
|
|
|
};
|
2023-02-06 20:55:27 +03:00
|
|
|
use axum_client_ip::InsecureClientIp;
|
2022-08-17 00:10:09 +03:00
|
|
|
use axum_macros::debug_handler;
|
2023-05-13 21:13:02 +03:00
|
|
|
use ethers::types::U64;
|
2023-05-13 01:15:32 +03:00
|
|
|
use fstrings::{f, format_args_f};
|
2022-05-29 20:28:41 +03:00
|
|
|
use futures::SinkExt;
|
2022-05-31 04:55:04 +03:00
|
|
|
use futures::{
|
|
|
|
future::AbortHandle,
|
|
|
|
stream::{SplitSink, SplitStream, StreamExt},
|
|
|
|
};
|
2022-08-12 22:07:14 +03:00
|
|
|
use handlebars::Handlebars;
|
2022-05-29 22:33:10 +03:00
|
|
|
use hashbrown::HashMap;
|
2022-11-08 22:58:11 +03:00
|
|
|
use http::StatusCode;
|
2023-05-13 01:15:32 +03:00
|
|
|
use log::{info, trace};
|
2022-12-24 04:32:58 +03:00
|
|
|
use serde_json::json;
|
2023-06-01 02:05:41 +03:00
|
|
|
use std::sync::atomic::AtomicU64;
|
2022-08-27 05:13:36 +03:00
|
|
|
use std::sync::Arc;
|
2022-07-09 01:14:45 +03:00
|
|
|
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
2023-01-19 03:17:43 +03:00
|
|
|
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock};
|
2022-06-16 05:53:37 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
/// How to select backend servers for a request
|
2023-03-03 04:39:50 +03:00
|
|
|
#[derive(Copy, Clone, Debug)]
|
2023-01-17 09:54:40 +03:00
|
|
|
pub enum ProxyMode {
|
|
|
|
/// send to the "best" synced server
|
|
|
|
Best,
|
|
|
|
/// send to all synced servers and return the fastest non-error response (reverts do not count as errors here)
|
|
|
|
Fastest(usize),
|
|
|
|
/// send to all servers for benchmarking. return the fastest non-error response
|
|
|
|
Versus,
|
2023-03-03 04:39:50 +03:00
|
|
|
/// send all requests and responses to kafka
|
2023-05-13 01:15:32 +03:00
|
|
|
/// TODO: should this be seperate from best/fastest/versus?
|
2023-03-03 04:39:50 +03:00
|
|
|
Debug,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for ProxyMode {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::Best
|
|
|
|
}
|
2023-01-17 09:54:40 +03:00
|
|
|
}
|
|
|
|
|
2022-10-18 00:47:58 +03:00
|
|
|
/// Public entrypoint for WebSocket JSON-RPC requests.
|
2023-01-17 09:54:40 +03:00
|
|
|
/// Queries a single server at a time
|
2022-08-17 00:10:09 +03:00
|
|
|
#[debug_handler]
|
2022-09-24 08:53:45 +03:00
|
|
|
pub async fn websocket_handler(
|
2022-07-07 06:22:09 +03:00
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-19 03:17:43 +03:00
|
|
|
_websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await
|
2023-01-17 09:54:40 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
|
|
|
|
/// Queries all synced backends with every request! This might get expensive!
|
|
|
|
#[debug_handler]
|
|
|
|
pub async fn fastest_websocket_handler(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-17 09:54:40 +03:00
|
|
|
// TODO: get the fastest number from the url params (default to 0/all)
|
|
|
|
// TODO: config to disable this
|
|
|
|
_websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
|
|
|
|
/// Queries **all** backends with every request! This might get expensive!
|
|
|
|
#[debug_handler]
|
|
|
|
pub async fn versus_websocket_handler(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-17 09:54:40 +03:00
|
|
|
// TODO: config to disable this
|
|
|
|
_websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn _websocket_handler(
|
|
|
|
proxy_mode: ProxyMode,
|
|
|
|
app: Arc<Web3ProxyApp>,
|
2023-02-06 20:55:27 +03:00
|
|
|
InsecureClientIp(ip): InsecureClientIp,
|
2022-10-21 23:59:05 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
2022-08-10 05:37:34 +03:00
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2022-11-08 22:58:11 +03:00
|
|
|
let origin = origin.map(|x| x.0);
|
|
|
|
|
2023-03-03 04:39:50 +03:00
|
|
|
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
|
2022-08-16 07:56:01 +03:00
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
let authorization = Arc::new(authorization);
|
2022-09-22 23:27:14 +03:00
|
|
|
|
2022-08-11 03:16:13 +03:00
|
|
|
match ws_upgrade {
|
2022-08-21 12:39:38 +03:00
|
|
|
Some(ws) => Ok(ws
|
2023-03-03 04:39:50 +03:00
|
|
|
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket))
|
2022-08-21 12:39:38 +03:00
|
|
|
.into_response()),
|
2022-08-07 22:33:16 +03:00
|
|
|
None => {
|
2022-10-18 00:47:58 +03:00
|
|
|
if let Some(redirect) = &app.config.redirect_public_url {
|
|
|
|
// this is not a websocket. redirect to a friendly page
|
2022-11-28 22:59:42 +03:00
|
|
|
Ok(Redirect::permanent(redirect).into_response())
|
2022-10-18 00:47:58 +03:00
|
|
|
} else {
|
2023-03-20 05:14:46 +03:00
|
|
|
Err(Web3ProxyError::WebsocketOnly)
|
2022-10-18 00:47:58 +03:00
|
|
|
}
|
2022-08-07 22:33:16 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-04 04:10:27 +03:00
|
|
|
}
|
|
|
|
|
2022-10-18 00:47:58 +03:00
|
|
|
/// Authenticated entrypoint for WebSocket JSON-RPC requests. Web3 wallets use this.
|
|
|
|
/// Rate limit and billing based on the api key in the url.
|
|
|
|
/// Can optionally authorized based on origin, referer, or user agent.
|
2022-08-17 00:10:09 +03:00
|
|
|
#[debug_handler]
|
2022-09-24 08:53:45 +03:00
|
|
|
pub async fn websocket_handler_with_key(
|
2022-08-04 04:10:27 +03:00
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
Path(rpc_key): Path<String>,
|
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-17 09:54:40 +03:00
|
|
|
_websocket_handler_with_key(
|
2023-01-20 05:32:31 +03:00
|
|
|
ProxyMode::Best,
|
2023-01-17 09:54:40 +03:00
|
|
|
app,
|
|
|
|
ip,
|
|
|
|
rpc_key,
|
|
|
|
origin,
|
|
|
|
referer,
|
|
|
|
user_agent,
|
|
|
|
ws_upgrade,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2023-03-03 04:39:50 +03:00
|
|
|
#[debug_handler]
|
|
|
|
pub async fn debug_websocket_handler_with_key(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
|
|
|
ip: InsecureClientIp,
|
|
|
|
Path(rpc_key): Path<String>,
|
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-03-03 04:39:50 +03:00
|
|
|
_websocket_handler_with_key(
|
|
|
|
ProxyMode::Debug,
|
|
|
|
app,
|
|
|
|
ip,
|
|
|
|
rpc_key,
|
|
|
|
origin,
|
|
|
|
referer,
|
|
|
|
user_agent,
|
|
|
|
ws_upgrade,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2023-01-17 09:54:40 +03:00
|
|
|
#[debug_handler]
|
|
|
|
pub async fn fastest_websocket_handler_with_key(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2022-10-27 03:12:42 +03:00
|
|
|
Path(rpc_key): Path<String>,
|
2022-09-23 08:22:33 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
2022-09-22 22:57:21 +03:00
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
2022-08-11 03:16:13 +03:00
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-17 09:54:40 +03:00
|
|
|
// TODO: get the fastest number from the url params (default to 0/all)
|
|
|
|
_websocket_handler_with_key(
|
|
|
|
ProxyMode::Fastest(0),
|
|
|
|
app,
|
|
|
|
ip,
|
|
|
|
rpc_key,
|
|
|
|
origin,
|
|
|
|
referer,
|
|
|
|
user_agent,
|
|
|
|
ws_upgrade,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
|
|
|
#[debug_handler]
|
|
|
|
pub async fn versus_websocket_handler_with_key(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2023-02-06 20:55:27 +03:00
|
|
|
ip: InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
Path(rpc_key): Path<String>,
|
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2023-01-17 09:54:40 +03:00
|
|
|
_websocket_handler_with_key(
|
|
|
|
ProxyMode::Versus,
|
|
|
|
app,
|
|
|
|
ip,
|
|
|
|
rpc_key,
|
|
|
|
origin,
|
|
|
|
referer,
|
|
|
|
user_agent,
|
|
|
|
ws_upgrade,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
|
async fn _websocket_handler_with_key(
|
|
|
|
proxy_mode: ProxyMode,
|
|
|
|
app: Arc<Web3ProxyApp>,
|
2023-02-06 20:55:27 +03:00
|
|
|
InsecureClientIp(ip): InsecureClientIp,
|
2023-01-17 09:54:40 +03:00
|
|
|
rpc_key: String,
|
|
|
|
origin: Option<TypedHeader<Origin>>,
|
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2023-03-17 05:38:11 +03:00
|
|
|
) -> Web3ProxyResponse {
|
2022-10-27 03:12:42 +03:00
|
|
|
let rpc_key = rpc_key.parse()?;
|
2022-09-24 08:53:45 +03:00
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
let (authorization, _semaphore) = key_is_authorized(
|
2022-09-22 22:57:21 +03:00
|
|
|
&app,
|
2022-10-27 03:12:42 +03:00
|
|
|
rpc_key,
|
2022-09-22 22:57:21 +03:00
|
|
|
ip,
|
2022-09-23 08:22:33 +03:00
|
|
|
origin.map(|x| x.0),
|
2023-03-03 04:39:50 +03:00
|
|
|
proxy_mode,
|
2022-09-22 22:57:21 +03:00
|
|
|
referer.map(|x| x.0),
|
|
|
|
user_agent.map(|x| x.0),
|
|
|
|
)
|
|
|
|
.await?;
|
2022-08-04 04:10:27 +03:00
|
|
|
|
2022-11-24 14:04:10 +03:00
|
|
|
trace!("websocket_handler_with_key {:?}", authorization);
|
2022-11-21 01:37:12 +03:00
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
let authorization = Arc::new(authorization);
|
2022-09-22 23:27:14 +03:00
|
|
|
|
2022-08-11 03:16:13 +03:00
|
|
|
match ws_upgrade {
|
2023-03-03 04:39:50 +03:00
|
|
|
Some(ws_upgrade) => {
|
|
|
|
Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket)))
|
|
|
|
}
|
2022-08-11 03:16:13 +03:00
|
|
|
None => {
|
2022-11-08 01:10:19 +03:00
|
|
|
// if no websocket upgrade, this is probably a user loading the url with their browser
|
2022-11-21 01:37:12 +03:00
|
|
|
|
|
|
|
// TODO: rate limit here? key_is_authorized might be enough
|
|
|
|
|
2022-11-08 22:58:11 +03:00
|
|
|
match (
|
|
|
|
&app.config.redirect_public_url,
|
|
|
|
&app.config.redirect_rpc_key_url,
|
2023-01-19 03:17:43 +03:00
|
|
|
authorization.checks.rpc_secret_key_id,
|
2022-11-08 22:58:11 +03:00
|
|
|
) {
|
2023-03-17 05:38:11 +03:00
|
|
|
(None, None, _) => Err(Web3ProxyError::StatusCode(
|
2022-11-21 01:37:12 +03:00
|
|
|
StatusCode::BAD_REQUEST,
|
|
|
|
"this page is for rpcs".to_string(),
|
|
|
|
None,
|
|
|
|
)),
|
2022-11-10 02:58:07 +03:00
|
|
|
(Some(redirect_public_url), _, None) => {
|
2022-11-28 22:59:42 +03:00
|
|
|
Ok(Redirect::permanent(redirect_public_url).into_response())
|
2022-10-21 23:59:05 +03:00
|
|
|
}
|
2022-11-08 22:58:11 +03:00
|
|
|
(_, Some(redirect_rpc_key_url), rpc_key_id) => {
|
|
|
|
let reg = Handlebars::new();
|
|
|
|
|
2023-01-19 03:17:43 +03:00
|
|
|
if authorization.checks.rpc_secret_key_id.is_none() {
|
2022-11-21 01:37:12 +03:00
|
|
|
// i don't think this is possible
|
2023-03-17 05:38:11 +03:00
|
|
|
Err(Web3ProxyError::StatusCode(
|
2022-11-21 01:37:12 +03:00
|
|
|
StatusCode::UNAUTHORIZED,
|
|
|
|
"AUTHORIZATION header required".to_string(),
|
|
|
|
None,
|
|
|
|
))
|
2022-11-08 22:58:11 +03:00
|
|
|
} else {
|
|
|
|
let redirect_rpc_key_url = reg
|
|
|
|
.render_template(
|
|
|
|
redirect_rpc_key_url,
|
|
|
|
&json!({ "rpc_key_id": rpc_key_id }),
|
|
|
|
)
|
|
|
|
.expect("templating should always work");
|
|
|
|
|
|
|
|
// this is not a websocket. redirect to a page for this user
|
2022-11-28 22:59:42 +03:00
|
|
|
Ok(Redirect::permanent(&redirect_rpc_key_url).into_response())
|
2022-11-08 22:58:11 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// any other combinations get a simple error
|
2023-03-17 05:38:11 +03:00
|
|
|
_ => Err(Web3ProxyError::StatusCode(
|
2022-11-08 22:58:11 +03:00
|
|
|
StatusCode::BAD_REQUEST,
|
|
|
|
"this page is for rpcs".to_string(),
|
|
|
|
None,
|
|
|
|
)),
|
2022-10-18 00:47:58 +03:00
|
|
|
}
|
2022-08-11 03:16:13 +03:00
|
|
|
}
|
|
|
|
}
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
|
2022-09-22 22:57:21 +03:00
|
|
|
async fn proxy_web3_socket(
|
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-09-22 22:57:21 +03:00
|
|
|
socket: WebSocket,
|
|
|
|
) {
|
2022-05-29 20:28:41 +03:00
|
|
|
// split the websocket so we can read and write concurrently
|
|
|
|
let (ws_tx, ws_rx) = socket.split();
|
|
|
|
|
|
|
|
// create a channel for our reader and writer can communicate. todo: benchmark different channels
|
2023-05-13 21:13:02 +03:00
|
|
|
let (response_sender, response_receiver) = flume::unbounded::<Message>();
|
2022-05-29 20:28:41 +03:00
|
|
|
|
2022-08-16 07:56:01 +03:00
|
|
|
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
|
2023-03-03 04:39:50 +03:00
|
|
|
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
|
2022-07-25 03:27:00 +03:00
|
|
|
/// websockets support a few more methods than http clients
|
2022-05-31 04:55:04 +03:00
|
|
|
async fn handle_socket_payload(
|
2022-06-14 10:13:42 +03:00
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: &Arc<Authorization>,
|
2022-05-31 04:55:04 +03:00
|
|
|
payload: &str,
|
2023-05-13 21:13:02 +03:00
|
|
|
response_sender: &flume::Sender<Message>,
|
2023-06-01 02:05:41 +03:00
|
|
|
subscription_count: &AtomicU64,
|
2023-05-13 21:13:02 +03:00
|
|
|
subscriptions: Arc<RwLock<HashMap<U64, AbortHandle>>>,
|
2023-05-13 01:15:32 +03:00
|
|
|
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
|
2023-01-19 03:17:43 +03:00
|
|
|
let (authorization, semaphore) = match authorization.check_again(&app).await {
|
|
|
|
Ok((a, s)) => (a, s),
|
|
|
|
Err(err) => {
|
|
|
|
let (_, err) = err.into_response_parts();
|
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
let err = JsonRpcForwardedResponse::from_response_data(err, Default::default());
|
|
|
|
|
|
|
|
let err = serde_json::to_string(&err)?;
|
2023-01-19 03:17:43 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
return Ok((Message::Text(err), None));
|
2023-01-19 03:17:43 +03:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-07-09 05:23:26 +03:00
|
|
|
// TODO: do any clients send batches over websockets?
|
2023-05-13 21:13:02 +03:00
|
|
|
// TODO: change response into response_data
|
|
|
|
let (response_id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
|
2022-11-08 22:58:11 +03:00
|
|
|
Ok(json_request) => {
|
2023-05-13 21:13:02 +03:00
|
|
|
let response_id = json_request.id.clone();
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
// TODO: move this to a seperate function so we can use the try operator
|
|
|
|
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> =
|
|
|
|
match &json_request.method[..] {
|
|
|
|
"eth_subscribe" => {
|
|
|
|
// TODO: how can we subscribe with proxy_mode?
|
|
|
|
match app
|
|
|
|
.eth_subscribe(
|
|
|
|
authorization.clone(),
|
|
|
|
json_request,
|
|
|
|
subscription_count,
|
|
|
|
response_sender.clone(),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok((handle, response)) => {
|
|
|
|
{
|
|
|
|
let mut x = subscriptions.write().await;
|
2022-11-20 01:05:51 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
let result: &serde_json::value::RawValue = response
|
|
|
|
.result
|
|
|
|
.as_ref()
|
|
|
|
.context("there should be a result here")?;
|
2022-11-20 01:05:51 +03:00
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
// TODO: there must be a better way to turn a RawValue
|
|
|
|
let k: U64 = serde_json::from_str(result.get())
|
|
|
|
.context("subscription ids must be U64s")?;
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
x.insert(k, handle);
|
|
|
|
};
|
2023-01-19 03:17:43 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
Ok(response.into())
|
|
|
|
}
|
|
|
|
Err(err) => Err(err),
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
|
|
|
"eth_unsubscribe" => {
|
|
|
|
let request_metadata =
|
|
|
|
RequestMetadata::new(&app, authorization.clone(), &json_request, None)
|
|
|
|
.await;
|
2022-11-20 01:05:51 +03:00
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
#[derive(serde::Deserialize)]
|
2023-05-13 21:13:02 +03:00
|
|
|
struct EthUnsubscribeParams([U64; 1]);
|
2023-05-13 01:15:32 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
match serde_json::from_value(json_request.params) {
|
|
|
|
Ok::<EthUnsubscribeParams, _>(params) => {
|
|
|
|
let subscription_id = ¶ms.0[0];
|
|
|
|
|
|
|
|
// TODO: is this the right response?
|
|
|
|
let partial_response = {
|
|
|
|
let mut x = subscriptions.write().await;
|
|
|
|
match x.remove(subscription_id) {
|
|
|
|
None => false,
|
|
|
|
Some(handle) => {
|
|
|
|
handle.abort();
|
|
|
|
true
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
2023-05-31 02:32:34 +03:00
|
|
|
}
|
|
|
|
};
|
2023-05-13 01:15:32 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
// TODO: don't create the response here. use a JsonRpcResponseData instead
|
|
|
|
let response = JsonRpcForwardedResponse::from_value(
|
|
|
|
json!(partial_response),
|
|
|
|
response_id.clone(),
|
|
|
|
);
|
2023-05-13 01:15:32 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
request_metadata.add_response(&response);
|
2023-05-13 01:15:32 +03:00
|
|
|
|
2023-05-31 02:32:34 +03:00
|
|
|
Ok(response.into())
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
2023-05-31 09:17:05 +03:00
|
|
|
Err(err) => Err(Web3ProxyError::BadRequest(
|
|
|
|
f!("incorrect params given for eth_unsubscribe. {err:?}").into(),
|
|
|
|
)),
|
2022-11-20 01:05:51 +03:00
|
|
|
}
|
|
|
|
}
|
2023-05-13 01:15:32 +03:00
|
|
|
_ => app
|
|
|
|
.proxy_web3_rpc(authorization.clone(), json_request.into())
|
|
|
|
.await
|
2023-05-31 02:32:34 +03:00
|
|
|
.map(|(_, response, _)| response),
|
2023-05-13 01:15:32 +03:00
|
|
|
};
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
(response_id, response)
|
2022-05-31 04:55:04 +03:00
|
|
|
}
|
|
|
|
Err(err) => {
|
2023-04-11 08:29:02 +03:00
|
|
|
let id = JsonRpcId::None.to_raw_value();
|
2022-05-31 04:55:04 +03:00
|
|
|
(id, Err(err.into()))
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let response_str = match response {
|
2022-12-20 08:37:12 +03:00
|
|
|
Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"),
|
2022-05-31 04:55:04 +03:00
|
|
|
Err(err) => {
|
2023-05-13 21:13:02 +03:00
|
|
|
let (_, response_data) = err.into_response_parts();
|
|
|
|
|
|
|
|
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
|
|
|
|
|
2022-12-20 08:37:12 +03:00
|
|
|
serde_json::to_string(&response).expect("to_string should always work here")
|
2022-05-31 04:55:04 +03:00
|
|
|
}
|
2022-12-20 08:37:12 +03:00
|
|
|
};
|
|
|
|
|
2023-05-13 01:15:32 +03:00
|
|
|
Ok((Message::Text(response_str), semaphore))
|
2022-05-31 04:55:04 +03:00
|
|
|
}
|
|
|
|
|
2022-05-29 20:28:41 +03:00
|
|
|
async fn read_web3_socket(
|
2022-07-07 06:22:09 +03:00
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-11-08 22:58:11 +03:00
|
|
|
authorization: Arc<Authorization>,
|
2022-05-29 20:28:41 +03:00
|
|
|
mut ws_rx: SplitStream<WebSocket>,
|
2023-05-13 21:13:02 +03:00
|
|
|
response_sender: flume::Sender<Message>,
|
2022-05-29 20:28:41 +03:00
|
|
|
) {
|
2023-05-13 01:15:32 +03:00
|
|
|
// RwLock should be fine here. a user isn't going to be opening tons of subscriptions
|
2023-01-19 03:17:43 +03:00
|
|
|
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
|
2023-06-01 02:05:41 +03:00
|
|
|
let subscription_count = Arc::new(AtomicU64::new(1));
|
2023-01-19 03:17:43 +03:00
|
|
|
|
|
|
|
let (close_sender, mut close_receiver) = broadcast::channel(1);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
msg = ws_rx.next() => {
|
|
|
|
if let Some(Ok(msg)) = msg {
|
2023-06-01 02:05:41 +03:00
|
|
|
// clone things so we can handle multiple messages in parallel
|
2023-01-19 03:17:43 +03:00
|
|
|
let close_sender = close_sender.clone();
|
|
|
|
let app = app.clone();
|
|
|
|
let authorization = authorization.clone();
|
|
|
|
let response_sender = response_sender.clone();
|
|
|
|
let subscriptions = subscriptions.clone();
|
|
|
|
let subscription_count = subscription_count.clone();
|
|
|
|
|
|
|
|
let f = async move {
|
2023-06-01 02:05:41 +03:00
|
|
|
// new message from our client. forward to a backend and then send it through response_sender
|
|
|
|
let (response_msg, _semaphore) = match msg {
|
2023-05-13 01:15:32 +03:00
|
|
|
Message::Text(ref payload) => {
|
2023-06-01 02:05:41 +03:00
|
|
|
// TODO: do not unwrap! turn errors into a jsonrpc response and send that instead
|
|
|
|
// TODO: some providers close the connection on error. i don't like that
|
|
|
|
let (m, s) = handle_socket_payload(
|
2023-01-19 03:17:43 +03:00
|
|
|
app.clone(),
|
|
|
|
&authorization,
|
2023-05-13 01:15:32 +03:00
|
|
|
payload,
|
2023-01-19 03:17:43 +03:00
|
|
|
&response_sender,
|
|
|
|
&subscription_count,
|
|
|
|
subscriptions,
|
|
|
|
)
|
2023-05-13 01:15:32 +03:00
|
|
|
.await.unwrap();
|
2023-01-19 03:17:43 +03:00
|
|
|
|
2023-06-01 02:05:41 +03:00
|
|
|
(m, Some(s))
|
2023-01-19 03:17:43 +03:00
|
|
|
}
|
|
|
|
Message::Ping(x) => {
|
|
|
|
trace!("ping: {:?}", x);
|
2023-06-01 02:05:41 +03:00
|
|
|
(Message::Pong(x), None)
|
2023-01-19 03:17:43 +03:00
|
|
|
}
|
|
|
|
Message::Pong(x) => {
|
|
|
|
trace!("pong: {:?}", x);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Message::Close(_) => {
|
|
|
|
info!("closing websocket connection");
|
|
|
|
// TODO: do something to close subscriptions?
|
|
|
|
let _ = close_sender.send(true);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Message::Binary(mut payload) => {
|
|
|
|
let payload = from_utf8_mut(&mut payload).unwrap();
|
|
|
|
|
2023-06-01 02:05:41 +03:00
|
|
|
// TODO: do not unwrap! turn errors into a jsonrpc response and send that instead
|
|
|
|
let (m, s) = handle_socket_payload(
|
2023-01-19 03:17:43 +03:00
|
|
|
app.clone(),
|
|
|
|
&authorization,
|
|
|
|
payload,
|
|
|
|
&response_sender,
|
|
|
|
&subscription_count,
|
|
|
|
subscriptions,
|
|
|
|
)
|
2023-05-13 01:15:32 +03:00
|
|
|
.await.unwrap();
|
2023-01-19 03:17:43 +03:00
|
|
|
|
2023-06-01 02:05:41 +03:00
|
|
|
(m, Some(s))
|
2023-01-19 03:17:43 +03:00
|
|
|
}
|
|
|
|
};
|
2022-05-29 20:28:41 +03:00
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
if response_sender.send_async(response_msg).await.is_err() {
|
2023-01-19 03:17:43 +03:00
|
|
|
let _ = close_sender.send(true);
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
tokio::spawn(f);
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ = close_receiver.recv() => {
|
2022-05-30 04:28:22 +03:00
|
|
|
break;
|
|
|
|
}
|
2023-01-19 03:17:43 +03:00
|
|
|
}
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn write_web3_socket(
|
2023-05-13 21:13:02 +03:00
|
|
|
response_rx: flume::Receiver<Message>,
|
2022-05-29 20:28:41 +03:00
|
|
|
mut ws_tx: SplitSink<WebSocket, Message>,
|
|
|
|
) {
|
2022-07-09 01:14:45 +03:00
|
|
|
// TODO: increment counter for open websockets
|
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
while let Ok(msg) = response_rx.recv_async().await {
|
2022-08-11 03:16:13 +03:00
|
|
|
// a response is ready
|
|
|
|
|
2023-06-01 02:05:41 +03:00
|
|
|
// we do not check rate limits here. they are checked before putting things into response_sender;
|
2022-08-11 03:16:13 +03:00
|
|
|
|
|
|
|
// forward the response to through the websocket
|
2022-06-16 05:53:37 +03:00
|
|
|
if let Err(err) = ws_tx.send(msg).await {
|
2022-11-16 10:19:56 +03:00
|
|
|
// this is common. it happens whenever a client disconnects
|
|
|
|
trace!("unable to write to websocket: {:?}", err);
|
2022-05-29 20:28:41 +03:00
|
|
|
break;
|
|
|
|
};
|
|
|
|
}
|
2022-07-09 01:14:45 +03:00
|
|
|
|
|
|
|
// TODO: decrement counter for open websockets
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|