2022-09-22 22:57:21 +03:00
|
|
|
use super::authorization::{ip_is_authorized, key_is_authorized, AuthorizedRequest};
|
2022-08-21 12:39:38 +03:00
|
|
|
use super::errors::FrontendResult;
|
2022-09-23 08:22:33 +03:00
|
|
|
use axum::headers::{Origin, Referer, UserAgent};
|
2022-05-20 08:30:54 +03:00
|
|
|
use axum::{
|
2022-05-29 20:28:41 +03:00
|
|
|
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
2022-08-06 04:17:25 +03:00
|
|
|
extract::Path,
|
2022-08-21 12:44:53 +03:00
|
|
|
response::{IntoResponse, Redirect},
|
2022-09-22 22:57:21 +03:00
|
|
|
Extension, TypedHeader,
|
2022-05-20 08:30:54 +03:00
|
|
|
};
|
2022-08-04 04:10:27 +03:00
|
|
|
use axum_client_ip::ClientIp;
|
2022-08-17 00:10:09 +03:00
|
|
|
use axum_macros::debug_handler;
|
2022-05-29 20:28:41 +03:00
|
|
|
use futures::SinkExt;
|
2022-05-31 04:55:04 +03:00
|
|
|
use futures::{
|
|
|
|
future::AbortHandle,
|
|
|
|
stream::{SplitSink, SplitStream, StreamExt},
|
|
|
|
};
|
2022-08-12 22:07:14 +03:00
|
|
|
use handlebars::Handlebars;
|
2022-05-29 22:33:10 +03:00
|
|
|
use hashbrown::HashMap;
|
2022-07-22 22:30:39 +03:00
|
|
|
use serde_json::{json, value::RawValue};
|
2022-08-27 05:13:36 +03:00
|
|
|
use std::sync::Arc;
|
2022-07-09 01:14:45 +03:00
|
|
|
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
|
2022-08-16 07:56:01 +03:00
|
|
|
use tracing::{error, error_span, info, trace, Instrument};
|
2022-08-06 04:17:25 +03:00
|
|
|
use uuid::Uuid;
|
2022-06-16 05:53:37 +03:00
|
|
|
|
|
|
|
use crate::{
|
|
|
|
app::Web3ProxyApp,
|
|
|
|
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
|
|
|
|
};
|
2022-05-20 08:30:54 +03:00
|
|
|
|
2022-08-17 00:10:09 +03:00
|
|
|
#[debug_handler]
|
2022-08-05 22:22:23 +03:00
|
|
|
pub async fn public_websocket_handler(
|
2022-07-07 06:22:09 +03:00
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2022-08-04 04:10:27 +03:00
|
|
|
ClientIp(ip): ClientIp,
|
2022-08-10 05:37:34 +03:00
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2022-08-21 12:39:38 +03:00
|
|
|
) -> FrontendResult {
|
2022-09-23 00:51:52 +03:00
|
|
|
let authorization = ip_is_authorized(&app, ip).await?;
|
2022-08-04 04:10:27 +03:00
|
|
|
|
2022-09-23 00:51:52 +03:00
|
|
|
let request_span = error_span!("request", ?authorization);
|
2022-08-16 07:56:01 +03:00
|
|
|
|
2022-09-23 00:51:52 +03:00
|
|
|
let authorization = Arc::new(authorization);
|
2022-09-22 23:27:14 +03:00
|
|
|
|
2022-08-11 03:16:13 +03:00
|
|
|
match ws_upgrade {
|
2022-08-21 12:39:38 +03:00
|
|
|
Some(ws) => Ok(ws
|
2022-09-22 22:57:21 +03:00
|
|
|
.on_upgrade(|socket| {
|
2022-09-23 00:51:52 +03:00
|
|
|
proxy_web3_socket(app, authorization, socket).instrument(request_span)
|
2022-09-22 22:57:21 +03:00
|
|
|
})
|
2022-08-21 12:39:38 +03:00
|
|
|
.into_response()),
|
2022-08-07 22:33:16 +03:00
|
|
|
None => {
|
2022-08-12 22:07:14 +03:00
|
|
|
// this is not a websocket. redirect to a friendly page
|
2022-08-21 12:39:38 +03:00
|
|
|
Ok(Redirect::to(&app.config.redirect_public_url).into_response())
|
2022-08-07 22:33:16 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-04 04:10:27 +03:00
|
|
|
}
|
|
|
|
|
2022-08-17 00:10:09 +03:00
|
|
|
#[debug_handler]
|
2022-08-04 04:10:27 +03:00
|
|
|
pub async fn user_websocket_handler(
|
|
|
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
2022-09-22 22:57:21 +03:00
|
|
|
ClientIp(ip): ClientIp,
|
2022-08-06 04:17:25 +03:00
|
|
|
Path(user_key): Path<Uuid>,
|
2022-09-23 08:22:33 +03:00
|
|
|
origin: Option<TypedHeader<Origin>>,
|
2022-09-22 22:57:21 +03:00
|
|
|
referer: Option<TypedHeader<Referer>>,
|
|
|
|
user_agent: Option<TypedHeader<UserAgent>>,
|
2022-08-11 03:16:13 +03:00
|
|
|
ws_upgrade: Option<WebSocketUpgrade>,
|
2022-08-21 12:39:38 +03:00
|
|
|
) -> FrontendResult {
|
2022-09-23 00:51:52 +03:00
|
|
|
let authorization = key_is_authorized(
|
2022-09-22 22:57:21 +03:00
|
|
|
&app,
|
|
|
|
user_key,
|
|
|
|
ip,
|
2022-09-23 08:22:33 +03:00
|
|
|
origin.map(|x| x.0),
|
2022-09-22 22:57:21 +03:00
|
|
|
referer.map(|x| x.0),
|
|
|
|
user_agent.map(|x| x.0),
|
|
|
|
)
|
|
|
|
.await?;
|
2022-08-04 04:10:27 +03:00
|
|
|
|
2022-09-22 23:27:14 +03:00
|
|
|
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
|
2022-09-23 00:51:52 +03:00
|
|
|
let request_span = error_span!("request", ?authorization);
|
2022-08-16 07:56:01 +03:00
|
|
|
|
2022-09-23 00:51:52 +03:00
|
|
|
let authorization = Arc::new(authorization);
|
2022-09-22 23:27:14 +03:00
|
|
|
|
2022-08-11 03:16:13 +03:00
|
|
|
match ws_upgrade {
|
2022-09-22 22:57:21 +03:00
|
|
|
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
|
2022-09-23 00:51:52 +03:00
|
|
|
proxy_web3_socket(app, authorization, socket).instrument(request_span)
|
2022-09-22 22:57:21 +03:00
|
|
|
})),
|
2022-08-11 03:16:13 +03:00
|
|
|
None => {
|
2022-08-12 22:07:14 +03:00
|
|
|
// TODO: store this on the app and use register_template?
|
|
|
|
let reg = Handlebars::new();
|
|
|
|
|
2022-09-05 09:29:27 +03:00
|
|
|
// TODO: show the user's address, not their id (remember to update the checks for {{user_id}}} in app.rs)
|
2022-08-12 04:48:32 +03:00
|
|
|
// TODO: query to get the user's address. expose that instead of user_id
|
2022-08-12 22:07:14 +03:00
|
|
|
let user_url = reg
|
|
|
|
.render_template(
|
|
|
|
&app.config.redirect_user_url,
|
2022-09-23 00:51:52 +03:00
|
|
|
&json!({ "authorization": authorization }),
|
2022-08-12 22:07:14 +03:00
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// this is not a websocket. redirect to a page for this user
|
2022-08-21 12:39:38 +03:00
|
|
|
Ok(Redirect::to(&user_url).into_response())
|
2022-08-11 03:16:13 +03:00
|
|
|
}
|
|
|
|
}
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
|
2022-09-22 22:57:21 +03:00
|
|
|
async fn proxy_web3_socket(
|
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization: Arc<AuthorizedRequest>,
|
2022-09-22 22:57:21 +03:00
|
|
|
socket: WebSocket,
|
|
|
|
) {
|
2022-05-29 20:28:41 +03:00
|
|
|
// split the websocket so we can read and write concurrently
|
|
|
|
let (ws_tx, ws_rx) = socket.split();
|
|
|
|
|
|
|
|
// create a channel for our reader and writer can communicate. todo: benchmark different channels
|
2022-08-13 01:12:46 +03:00
|
|
|
let (response_sender, response_receiver) = flume::unbounded::<Message>();
|
2022-05-29 20:28:41 +03:00
|
|
|
|
2022-08-16 07:56:01 +03:00
|
|
|
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
|
2022-09-23 00:51:52 +03:00
|
|
|
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
|
2022-07-25 03:27:00 +03:00
|
|
|
/// websockets support a few more methods than http clients
|
2022-05-31 04:55:04 +03:00
|
|
|
async fn handle_socket_payload(
|
2022-06-14 10:13:42 +03:00
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization: Arc<AuthorizedRequest>,
|
2022-05-31 04:55:04 +03:00
|
|
|
payload: &str,
|
2022-07-09 01:14:45 +03:00
|
|
|
response_sender: &flume::Sender<Message>,
|
|
|
|
subscription_count: &AtomicUsize,
|
2022-05-31 04:55:04 +03:00
|
|
|
subscriptions: &mut HashMap<String, AbortHandle>,
|
|
|
|
) -> Message {
|
2022-07-09 05:23:26 +03:00
|
|
|
// TODO: do any clients send batches over websockets?
|
2022-05-31 04:55:04 +03:00
|
|
|
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
|
|
|
|
Ok(payload) => {
|
2022-08-16 07:56:01 +03:00
|
|
|
// TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic
|
2022-05-31 04:55:04 +03:00
|
|
|
let id = payload.id.clone();
|
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &payload.method[..] {
|
|
|
|
"eth_subscribe" => {
|
2022-08-16 07:56:01 +03:00
|
|
|
// TODO: what should go in this span?
|
|
|
|
let span = error_span!("eth_subscribe");
|
|
|
|
|
2022-06-14 10:13:42 +03:00
|
|
|
let response = app
|
2022-09-22 23:27:14 +03:00
|
|
|
.eth_subscribe(
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization.clone(),
|
2022-09-22 23:27:14 +03:00
|
|
|
payload,
|
|
|
|
subscription_count,
|
|
|
|
response_sender.clone(),
|
|
|
|
)
|
2022-08-16 07:56:01 +03:00
|
|
|
.instrument(span)
|
2022-06-14 10:13:42 +03:00
|
|
|
.await;
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
match response {
|
|
|
|
Ok((handle, response)) => {
|
|
|
|
// TODO: better key
|
|
|
|
subscriptions
|
|
|
|
.insert(response.result.as_ref().unwrap().to_string(), handle);
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
Ok(response.into())
|
|
|
|
}
|
|
|
|
Err(err) => Err(err),
|
2022-05-31 04:55:04 +03:00
|
|
|
}
|
|
|
|
}
|
2022-06-14 07:04:14 +03:00
|
|
|
"eth_unsubscribe" => {
|
2022-08-13 01:12:46 +03:00
|
|
|
// TODO: how should handle rate limits and stats on this?
|
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
let subscription_id = payload.params.unwrap().to_string();
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
let partial_response = match subscriptions.remove(&subscription_id) {
|
2022-08-13 01:12:46 +03:00
|
|
|
None => false,
|
2022-06-14 07:04:14 +03:00
|
|
|
Some(handle) => {
|
|
|
|
handle.abort();
|
2022-08-13 01:12:46 +03:00
|
|
|
true
|
2022-06-14 07:04:14 +03:00
|
|
|
}
|
|
|
|
};
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2022-07-22 22:30:39 +03:00
|
|
|
let response =
|
|
|
|
JsonRpcForwardedResponse::from_value(json!(partial_response), id.clone());
|
2022-05-31 04:55:04 +03:00
|
|
|
|
2022-06-14 07:04:14 +03:00
|
|
|
Ok(response.into())
|
|
|
|
}
|
2022-09-23 00:51:52 +03:00
|
|
|
_ => app.proxy_web3_rpc(&authorization, payload.into()).await,
|
2022-05-31 04:55:04 +03:00
|
|
|
};
|
|
|
|
|
|
|
|
(id, response)
|
|
|
|
}
|
|
|
|
Err(err) => {
|
2022-06-06 01:39:44 +03:00
|
|
|
let id = RawValue::from_string("null".to_string()).unwrap();
|
2022-05-31 04:55:04 +03:00
|
|
|
(id, Err(err.into()))
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let response_str = match response {
|
|
|
|
Ok(x) => serde_json::to_string(&x),
|
|
|
|
Err(err) => {
|
|
|
|
// we have an anyhow error. turn it into
|
2022-09-10 03:12:14 +03:00
|
|
|
let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id));
|
2022-05-31 04:55:04 +03:00
|
|
|
serde_json::to_string(&response)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
Message::Text(response_str)
|
|
|
|
}
|
|
|
|
|
2022-05-29 20:28:41 +03:00
|
|
|
async fn read_web3_socket(
|
2022-07-07 06:22:09 +03:00
|
|
|
app: Arc<Web3ProxyApp>,
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization: Arc<AuthorizedRequest>,
|
2022-05-29 20:28:41 +03:00
|
|
|
mut ws_rx: SplitStream<WebSocket>,
|
2022-07-09 01:14:45 +03:00
|
|
|
response_sender: flume::Sender<Message>,
|
2022-05-29 20:28:41 +03:00
|
|
|
) {
|
2022-05-29 22:33:10 +03:00
|
|
|
let mut subscriptions = HashMap::new();
|
2022-07-09 01:14:45 +03:00
|
|
|
let subscription_count = AtomicUsize::new(1);
|
2022-05-29 22:33:10 +03:00
|
|
|
|
2022-05-29 20:28:41 +03:00
|
|
|
while let Some(Ok(msg)) = ws_rx.next().await {
|
|
|
|
// new message from our client. forward to a backend and then send it through response_tx
|
|
|
|
let response_msg = match msg {
|
|
|
|
Message::Text(payload) => {
|
2022-07-09 01:14:45 +03:00
|
|
|
handle_socket_payload(
|
|
|
|
app.clone(),
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization.clone(),
|
2022-07-09 01:14:45 +03:00
|
|
|
&payload,
|
|
|
|
&response_sender,
|
|
|
|
&subscription_count,
|
|
|
|
&mut subscriptions,
|
|
|
|
)
|
|
|
|
.await
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|
|
|
|
Message::Ping(x) => Message::Pong(x),
|
2022-05-31 04:55:04 +03:00
|
|
|
Message::Pong(x) => {
|
2022-07-07 06:22:09 +03:00
|
|
|
trace!("pong: {:?}", x);
|
2022-05-31 04:55:04 +03:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Message::Close(_) => {
|
|
|
|
info!("closing websocket connection");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
Message::Binary(mut payload) => {
|
2022-08-11 03:16:13 +03:00
|
|
|
// TODO: poke rate limit for the user/ip
|
2022-05-31 04:55:04 +03:00
|
|
|
let payload = from_utf8_mut(&mut payload).unwrap();
|
|
|
|
|
2022-07-09 01:14:45 +03:00
|
|
|
handle_socket_payload(
|
|
|
|
app.clone(),
|
2022-09-23 00:51:52 +03:00
|
|
|
authorization.clone(),
|
2022-07-09 01:14:45 +03:00
|
|
|
payload,
|
|
|
|
&response_sender,
|
|
|
|
&subscription_count,
|
|
|
|
&mut subscriptions,
|
|
|
|
)
|
|
|
|
.await
|
2022-05-31 04:55:04 +03:00
|
|
|
}
|
2022-05-29 20:28:41 +03:00
|
|
|
};
|
|
|
|
|
2022-07-09 01:14:45 +03:00
|
|
|
match response_sender.send_async(response_msg).await {
|
2022-05-30 04:28:22 +03:00
|
|
|
Ok(_) => {}
|
|
|
|
Err(err) => {
|
|
|
|
error!("{}", err);
|
|
|
|
break;
|
|
|
|
}
|
2022-05-29 20:28:41 +03:00
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn write_web3_socket(
|
|
|
|
response_rx: flume::Receiver<Message>,
|
|
|
|
mut ws_tx: SplitSink<WebSocket, Message>,
|
|
|
|
) {
|
2022-07-09 01:14:45 +03:00
|
|
|
// TODO: increment counter for open websockets
|
|
|
|
|
2022-05-29 20:28:41 +03:00
|
|
|
while let Ok(msg) = response_rx.recv_async().await {
|
2022-08-11 03:16:13 +03:00
|
|
|
// a response is ready
|
|
|
|
|
|
|
|
// TODO: poke rate limits for this user?
|
|
|
|
|
|
|
|
// forward the response to through the websocket
|
2022-06-16 05:53:37 +03:00
|
|
|
if let Err(err) = ws_tx.send(msg).await {
|
2022-07-09 01:14:45 +03:00
|
|
|
// this isn't a problem. this is common and happens whenever a client disconnects
|
|
|
|
trace!(?err, "unable to write to websocket");
|
2022-05-29 20:28:41 +03:00
|
|
|
break;
|
|
|
|
};
|
|
|
|
}
|
2022-07-09 01:14:45 +03:00
|
|
|
|
|
|
|
// TODO: decrement counter for open websockets
|
2022-05-29 20:28:41 +03:00
|
|
|
}
|