improve eth_sendRawTransaction

This commit is contained in:
Bryan Stitt 2023-10-20 12:02:03 -07:00
parent ff3ff5d72f
commit 2019888957

View File

@ -1080,18 +1080,143 @@ impl App {
}
/// try to send transactions to the best available rpcs with protected/private mempools
/// if no protected rpcs are configured, then some public rpcs are used instead
/// if no protected rpcs are configured (and protected_only is false), then public rpcs are used instead
/// TODO: should this return an H256 instead of an Arc<RawValue>?
async fn try_send_protected(
self: &Arc<Self>,
web3_request: &Arc<ValidatedRequest>,
) -> Web3ProxyResult<SingleResponse<Arc<RawValue>>> {
if self.protected_rpcs.is_empty() {
protected_only: bool,
) -> Web3ProxyResult<ForwardedResponse<Arc<RawValue>>> {
// decode the transaction
let params = web3_request
.inner
.params()
.as_array()
.ok_or_else(|| Web3ProxyError::BadRequest("Unable to get array from params".into()))?
.get(0)
.ok_or_else(|| Web3ProxyError::BadRequest("Unable to get item 0 from params".into()))?
.as_str()
.ok_or_else(|| {
Web3ProxyError::BadRequest("Unable to get string from params item 0".into())
})?;
let bytes = Bytes::from_str(params)
.map_err(|_| Web3ProxyError::BadRequest("Unable to parse params as bytes".into()))?;
let rlp = Rlp::new(bytes.as_ref());
let tx = Transaction::decode(&rlp).map_err(|_| {
Web3ProxyError::BadRequest("failed to parse rlp into transaction".into())
})?;
if let Some(chain_id) = tx.chain_id {
if self.config.chain_id != chain_id.as_u64() {
return Err(Web3ProxyError::BadRequest(
format!(
"unexpected chain_id. {} != {}",
chain_id, self.config.chain_id
)
.into(),
));
}
}
// TODO: return now if already confirmed
// TODO: error if the nonce is way far in the future
let mut response = if protected_only {
if self.protected_rpcs.is_empty() {
// TODO: different error?
return Err(Web3ProxyError::NoServersSynced);
}
self.protected_rpcs
.request_with_metadata(web3_request)
.await
} else if self.protected_rpcs.is_empty() {
self.balanced_rpcs.request_with_metadata(web3_request).await
} else {
self.protected_rpcs
.request_with_metadata(web3_request)
.await
};
// TODO: helper for doing parsed() inside a response?
if let Ok(SingleResponse::Stream(x)) = response {
response = x
.read()
.await
.map(SingleResponse::Parsed)
.map_err(Into::into);
}
let mut response = response.try_into()?;
let txid = tx.hash();
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function. probably part of try_send_protected
if let ForwardedResponse::RpcError { error_data, .. } = &response {
let acceptable_error_messages = [
"already known",
"ALREADY_EXISTS: already known",
"INTERNAL_ERROR: existing tx with same hash",
"",
];
if acceptable_error_messages.contains(&error_data.message.as_ref()) {
response = ForwardedResponse::from(json!(txid));
}
}
// if successful, send the txid to the pending transaction firehose
if let ForwardedResponse::Result { value, .. } = &response {
// no idea how we got an array here, but lets force this to just the txid
// TODO: think about this more
if value.get().starts_with('[') {
error!(
?value,
?txid,
"unexpected array response from sendRawTransaction"
);
response = ForwardedResponse::from(json!(txid));
}
self.pending_txid_firehose.send(txid).await;
// emit transaction count stats
// TODO: different salt for ips and transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
let now = Utc::now().timestamp();
let app = self.clone();
let salted_tx_hash = format!("{}:{:?}", salt, txid);
let f = async move {
match app.redis_conn().await {
Ok(mut redis_conn) => {
let hashed_tx_hash = Bytes::from(keccak256(salted_tx_hash.as_bytes()));
let recent_tx_hash_key =
format!("eth_sendRawTransaction:{}", app.config.chain_id);
redis_conn
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Err(Web3ProxyError::NoDatabaseConfigured) => {}
Err(err) => {
warn!(?err, "unable to save stats for eth_sendRawTransaction",)
}
}
Ok::<_, anyhow::Error>(())
};
tokio::spawn(f);
}
}
Ok(response)
}
/// proxy request with up to 3 tries.
@ -1404,116 +1529,14 @@ impl App {
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), web3_request.id()).into(),
"eth_mining" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), web3_request.id()).into(),
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// TODO: eth_sendPrivateTransaction
// TODO: decode the transaction
// TODO: error if the chain_id is incorrect
// TODO: return now if already confirmed
// TODO: error if the nonce is way far in the future
// TODO: self.pending_txid_firehose.send(txid).await;
let response = self
// TODO: eth_sendPrivateTransaction that only sends private and never to balanced. it has different params though
let x = self
.try_send_protected(
web3_request,
).await;
web3_request,false,
).await?;
let mut response = response.try_into()?;
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function. probably part of try_send_protected
if let ForwardedResponse::RpcError{ error_data, ..} = &response {
let acceptable_error_messages = [
"already known",
"ALREADY_EXISTS: already known",
"INTERNAL_ERROR: existing tx with same hash",
"",
];
if acceptable_error_messages.contains(&error_data.message.as_ref())
{
let params = web3_request.inner.params()
.as_array()
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get array from params".into(),
)
})?
.get(0)
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get item 0 from params".into(),
)
})?
.as_str()
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get string from params item 0".into(),
)
})?;
let params = Bytes::from_str(params)
.expect("there must be Bytes if we got this far");
let rlp = Rlp::new(params.as_ref());
if let Ok(tx) = Transaction::decode(&rlp) {
// TODO: decode earlier and confirm that tx.chain_id (if set) matches self.config.chain_id
let tx_hash = json!(tx.hash());
trace!("tx_hash: {:#?}", tx_hash);
response = ForwardedResponse::from(tx_hash);
}
}
}
// TODO: if successful, send the txid to the pending transaction firehose
// emit transaction count stats
// TODO: use this cache to avoid sending duplicate transactions?
// TODO: different salt for ips and transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let ForwardedResponse::Result { value, .. } = &response {
let now = Utc::now().timestamp();
let app = self.clone();
let salted_tx_hash = format!("{}:{}", salt, value.get());
let f = async move {
match app.redis_conn().await {
Ok(mut redis_conn) => {
let hashed_tx_hash =
Bytes::from(keccak256(salted_tx_hash.as_bytes()));
let recent_tx_hash_key =
format!("eth_sendRawTransaction:{}", app.config.chain_id);
redis_conn
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Err(Web3ProxyError::NoDatabaseConfigured) => {},
Err(err) => {
warn!(
?err,
"unable to save stats for eth_sendRawTransaction",
)
}
}
Ok::<_, anyhow::Error>(())
};
tokio::spawn(f);
}
}
jsonrpc::ParsedResponse::from_response_data(response, web3_request.id()).into()
jsonrpc::ParsedResponse::from_response_data(x, web3_request.id()).into()
}
"eth_syncing" => {
// no stats on this. its cheap