diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 440868d4..334c1775 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1080,18 +1080,143 @@ impl App { } /// try to send transactions to the best available rpcs with protected/private mempools - /// if no protected rpcs are configured, then some public rpcs are used instead + /// if no protected rpcs are configured (and protected_only is false), then public rpcs are used instead + /// TODO: should this return an H256 instead of an Arc? async fn try_send_protected( self: &Arc, web3_request: &Arc, - ) -> Web3ProxyResult>> { - if self.protected_rpcs.is_empty() { + protected_only: bool, + ) -> Web3ProxyResult>> { + // decode the transaction + let params = web3_request + .inner + .params() + .as_array() + .ok_or_else(|| Web3ProxyError::BadRequest("Unable to get array from params".into()))? + .get(0) + .ok_or_else(|| Web3ProxyError::BadRequest("Unable to get item 0 from params".into()))? + .as_str() + .ok_or_else(|| { + Web3ProxyError::BadRequest("Unable to get string from params item 0".into()) + })?; + + let bytes = Bytes::from_str(params) + .map_err(|_| Web3ProxyError::BadRequest("Unable to parse params as bytes".into()))?; + + let rlp = Rlp::new(bytes.as_ref()); + + let tx = Transaction::decode(&rlp).map_err(|_| { + Web3ProxyError::BadRequest("failed to parse rlp into transaction".into()) + })?; + + if let Some(chain_id) = tx.chain_id { + if self.config.chain_id != chain_id.as_u64() { + return Err(Web3ProxyError::BadRequest( + format!( + "unexpected chain_id. {} != {}", + chain_id, self.config.chain_id + ) + .into(), + )); + } + } + + // TODO: return now if already confirmed + // TODO: error if the nonce is way far in the future + + let mut response = if protected_only { + if self.protected_rpcs.is_empty() { + // TODO: different error? + return Err(Web3ProxyError::NoServersSynced); + } + self.protected_rpcs + .request_with_metadata(web3_request) + .await + } else if self.protected_rpcs.is_empty() { self.balanced_rpcs.request_with_metadata(web3_request).await } else { self.protected_rpcs .request_with_metadata(web3_request) .await + }; + + // TODO: helper for doing parsed() inside a response? + if let Ok(SingleResponse::Stream(x)) = response { + response = x + .read() + .await + .map(SingleResponse::Parsed) + .map_err(Into::into); } + + let mut response = response.try_into()?; + + let txid = tx.hash(); + + // sometimes we get an error that the transaction is already known by our nodes, + // that's not really an error. Return the hash like a successful response would. + // TODO: move this to a helper function. probably part of try_send_protected + if let ForwardedResponse::RpcError { error_data, .. } = &response { + let acceptable_error_messages = [ + "already known", + "ALREADY_EXISTS: already known", + "INTERNAL_ERROR: existing tx with same hash", + "", + ]; + if acceptable_error_messages.contains(&error_data.message.as_ref()) { + response = ForwardedResponse::from(json!(txid)); + } + } + + // if successful, send the txid to the pending transaction firehose + if let ForwardedResponse::Result { value, .. } = &response { + // no idea how we got an array here, but lets force this to just the txid + // TODO: think about this more + if value.get().starts_with('[') { + error!( + ?value, + ?txid, + "unexpected array response from sendRawTransaction" + ); + response = ForwardedResponse::from(json!(txid)); + } + + self.pending_txid_firehose.send(txid).await; + + // emit transaction count stats + // TODO: different salt for ips and transactions? + if let Some(ref salt) = self.config.public_recent_ips_salt { + let now = Utc::now().timestamp(); + let app = self.clone(); + + let salted_tx_hash = format!("{}:{:?}", salt, txid); + + let f = async move { + match app.redis_conn().await { + Ok(mut redis_conn) => { + let hashed_tx_hash = Bytes::from(keccak256(salted_tx_hash.as_bytes())); + + let recent_tx_hash_key = + format!("eth_sendRawTransaction:{}", app.config.chain_id); + + redis_conn + .zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now) + .await?; + } + Err(Web3ProxyError::NoDatabaseConfigured) => {} + Err(err) => { + warn!(?err, "unable to save stats for eth_sendRawTransaction",) + } + } + + Ok::<_, anyhow::Error>(()) + }; + + tokio::spawn(f); + } + } + + Ok(response) } /// proxy request with up to 3 tries. @@ -1404,116 +1529,14 @@ impl App { // TODO: eth_gasPrice that does awesome magic to predict the future "eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), web3_request.id()).into(), "eth_mining" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), web3_request.id()).into(), - // TODO: eth_sendBundle (flashbots/eden command) - // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { - // TODO: eth_sendPrivateTransaction - // TODO: decode the transaction - - // TODO: error if the chain_id is incorrect - // TODO: return now if already confirmed - // TODO: error if the nonce is way far in the future - - // TODO: self.pending_txid_firehose.send(txid).await; - - let response = self + // TODO: eth_sendPrivateTransaction that only sends private and never to balanced. it has different params though + let x = self .try_send_protected( - web3_request, - ).await; + web3_request,false, + ).await?; - let mut response = response.try_into()?; - - // sometimes we get an error that the transaction is already known by our nodes, - // that's not really an error. Return the hash like a successful response would. - // TODO: move this to a helper function. probably part of try_send_protected - if let ForwardedResponse::RpcError{ error_data, ..} = &response { - let acceptable_error_messages = [ - "already known", - "ALREADY_EXISTS: already known", - "INTERNAL_ERROR: existing tx with same hash", - "", - ]; - - if acceptable_error_messages.contains(&error_data.message.as_ref()) - { - let params = web3_request.inner.params() - .as_array() - .ok_or_else(|| { - Web3ProxyError::BadRequest( - "Unable to get array from params".into(), - ) - })? - .get(0) - .ok_or_else(|| { - Web3ProxyError::BadRequest( - "Unable to get item 0 from params".into(), - ) - })? - .as_str() - .ok_or_else(|| { - Web3ProxyError::BadRequest( - "Unable to get string from params item 0".into(), - ) - })?; - - let params = Bytes::from_str(params) - .expect("there must be Bytes if we got this far"); - - let rlp = Rlp::new(params.as_ref()); - - if let Ok(tx) = Transaction::decode(&rlp) { - // TODO: decode earlier and confirm that tx.chain_id (if set) matches self.config.chain_id - let tx_hash = json!(tx.hash()); - - trace!("tx_hash: {:#?}", tx_hash); - - response = ForwardedResponse::from(tx_hash); - } - } - } - - // TODO: if successful, send the txid to the pending transaction firehose - - // emit transaction count stats - // TODO: use this cache to avoid sending duplicate transactions? - // TODO: different salt for ips and transactions? - if let Some(ref salt) = self.config.public_recent_ips_salt { - if let ForwardedResponse::Result { value, .. } = &response { - let now = Utc::now().timestamp(); - let app = self.clone(); - - let salted_tx_hash = format!("{}:{}", salt, value.get()); - - let f = async move { - match app.redis_conn().await { - Ok(mut redis_conn) => { - let hashed_tx_hash = - Bytes::from(keccak256(salted_tx_hash.as_bytes())); - - let recent_tx_hash_key = - format!("eth_sendRawTransaction:{}", app.config.chain_id); - - redis_conn - .zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now) - .await?; - } - Err(Web3ProxyError::NoDatabaseConfigured) => {}, - Err(err) => { - warn!( - ?err, - "unable to save stats for eth_sendRawTransaction", - ) - } - } - - Ok::<_, anyhow::Error>(()) - }; - - tokio::spawn(f); - } - } - - jsonrpc::ParsedResponse::from_response_data(response, web3_request.id()).into() + jsonrpc::ParsedResponse::from_response_data(x, web3_request.id()).into() } "eth_syncing" => { // no stats on this. its cheap