fix eth_sendRawTransaction

This commit is contained in:
Bryan Stitt 2023-10-13 19:41:59 -07:00
parent 7cd7257058
commit ce3fbca7b6
8 changed files with 43 additions and 27 deletions

View File

@ -1079,7 +1079,6 @@ impl App {
if self.protected_rpcs.is_empty() {
self.balanced_rpcs.request_with_metadata(web3_request).await
} else {
todo!("its not empty")
self.protected_rpcs
.request_with_metadata(web3_request)
.await

View File

@ -466,6 +466,7 @@ impl CacheMode {
cache_errors: false,
})
}
"eth_sendRawTransaction" => Ok(CacheMode::Never),
"net_listening" => Ok(CacheMode::SuccessForever),
"net_version" => Ok(CacheMode::SuccessForever),
method => match get_block_param_id(method) {
@ -495,7 +496,7 @@ impl CacheMode {
match self {
Self::SuccessForever => None,
Self::Never => None,
Self::Standard { block, .. } => Some(block),
Self::Standard { .. } => None,
Self::Range { from_block, .. } => Some(from_block),
}
}

View File

@ -269,16 +269,7 @@ impl Web3ProxyError {
)
}
Self::Arc(err) => {
// recurse somehow. Web3ProxyError isn't clone and we can't moe out of it
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
// TODO: is it safe to expose all of our anyhow strings?
message: "INTERNAL SERVER ERROR".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: Some(serde_json::Value::String(err.to_string())),
},
)
return err.as_response_parts();
}
Self::BadRequest(err) => {
trace!(?err, "BAD_REQUEST");

View File

@ -309,7 +309,7 @@ impl ValidatedRequest {
app: Option<&App>,
authorization: Arc<Authorization>,
chain_id: u64,
head_block: Option<Web3ProxyBlock>,
mut head_block: Option<Web3ProxyBlock>,
kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
max_wait: Option<Duration>,
permit: Option<OwnedSemaphorePermit>,
@ -331,6 +331,12 @@ impl ValidatedRequest {
kafka_debug_logger.log_debug_request(&request);
}
if head_block.is_none() {
if let Some(app) = app {
head_block = app.head_block_receiver().borrow().clone();
}
}
// now that kafka has logged the user's original params, we can calculate the cache key
// TODO: modify CacheMode::new to wait for a future block if one is requested! be sure to update head_block too!
@ -589,7 +595,7 @@ impl Drop for ValidatedRequest {
// turn `&mut self` into `self`
let x = mem::take(self);
trace!(?x, "request metadata dropped without stat send");
// trace!(?x, "request metadata dropped without stat send");
let _ = x.try_send_stat();
}
}

View File

@ -23,7 +23,7 @@ use std::time::Duration;
use tokio::select;
use tokio::task::yield_now;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, enabled, error, info, trace, warn, Level};
use tracing::{debug, enabled, error, info, instrument, trace, warn, Level};
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
@ -89,12 +89,13 @@ pub struct RankedRpcs {
pub num_synced: usize,
pub backups_needed: bool,
inner: Vec<Arc<Web3Rpc>>,
pub(crate) inner: Vec<Arc<Web3Rpc>>,
sort_mode: SortMethod,
}
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
#[derive(Debug)]
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
outer: Vec<Arc<Web3Rpc>>,
@ -106,7 +107,8 @@ impl RankedRpcs {
// we don't need to sort the rpcs now. we will sort them when a request neds them
// TODO: the shame about this is that we lose just being able to compare 2 random servers
let head_block = head_block?;
// TODO: why is head_block not set here?! it should always be set!
let head_block = head_block.unwrap_or_default();
let backups_needed = rpcs.iter().any(|x| x.backup);
@ -219,6 +221,7 @@ impl RankedRpcs {
// TODO: max lag was already handled
for rpc in self.inner.iter().cloned() {
// if web3_request.head_block.is_some() {
if let Some(block_needed) = min_block_needed {
if !rpc.has_block_data(block_needed) {
outer_for_request.push(rpc);
@ -231,6 +234,7 @@ impl RankedRpcs {
continue;
}
}
// }
inner_for_request.push(rpc);
}
@ -421,7 +425,7 @@ impl ConsensusFinder {
match old_ranked_rpcs.as_ref() {
None => {
info!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
"first {}/{} {}{}/{}/{} block={:?}, rpc={}",
best_tier,
worst_tier,
backups_voted_str,
@ -870,6 +874,7 @@ fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web
*/
impl RpcsForRequest {
#[instrument]
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
stream! {
trace!("entered stream");
@ -919,9 +924,8 @@ impl RpcsForRequest {
trace!("No request handle for {}. err={:?}", best_rpc, err);
}
}
yield_now().await;
}
yield_now().await;
}
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited

View File

@ -403,17 +403,18 @@ impl Web3Rpcs {
ranked_rpcs
} else if self.watch_head_block.is_some() {
// if we are here, this set of rpcs is subscribed to newHeads. But we didn't get a RankedRpcs. that means something is wrong
trace!("watch_head_block is some");
return Err(Web3ProxyError::NoServersSynced);
} else {
trace!("watch_head_block is none");
// no RankedRpcs, but also no newHeads subscription. This is probably a set of "protected" rpcs or similar
// TODO: return a future that resolves once we do have something?
let rpcs = self.by_name.read().values().cloned().collect();
if let Some(x) = RankedRpcs::from_rpcs(rpcs, web3_request.head_block.clone()) {
Arc::new(x)
} else {
// i doubt we will ever get here
// TODO: return a future that resolves once we do have something?
return Err(Web3ProxyError::NoServersSynced);
}
};
@ -611,7 +612,13 @@ impl Serialize for Web3Rpcs {
// TODO: rename synced_connections to consensus_rpcs
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
state.serialize_field("synced_connections", consensus_rpcs)?;
let names: Vec<_> = consensus_rpcs
.inner
.iter()
.map(|x| x.name.as_str())
.collect();
state.serialize_field("synced_connections", &names)?;
} else {
state.serialize_field("synced_connections", &None::<()>)?;
}

View File

@ -106,7 +106,14 @@ impl TestApp {
},
)]),
// influxdb_client: influx.map(|x| x.client),
private_rpcs: Default::default(),
private_rpcs: HashMap::from([(
"anvil_private".to_string(),
Web3RpcConfig {
http_url: Some(anvil.instance.endpoint()),
ws_url: Some(anvil.instance.ws_endpoint()),
..Default::default()
},
)]),
bundler_4337_rpcs: Default::default(),
extra: Default::default(),
};

View File

@ -120,6 +120,8 @@ async fn it_matches_anvil() {
let x = TestApp::spawn(&a, None, None, None).await;
let proxy_provider = Http::from_str(x.proxy_provider.url().as_str()).unwrap();
let weighted_anvil_provider =
WeightedProvider::new(Http::from_str(&a.instance.endpoint()).unwrap());
let weighted_proxy_provider =
@ -201,8 +203,7 @@ async fn it_matches_anvil() {
// fund singleton deployer
// TODO: send through the quorum provider. it should detect that its already confirmed
let fund_tx_hash: H256 = a
.provider
let fund_tx_hash: H256 = proxy_provider
.request("eth_sendRawTransaction", [raw_tx])
.await
.unwrap();
@ -210,7 +211,7 @@ async fn it_matches_anvil() {
// deploy singleton deployer
// TODO: send through the quorum provider. it should detect that its already confirmed
let deploy_tx: H256 = a.provider.request("eth_sendRawTransaction", ["0xf9016c8085174876e8008303c4d88080b90154608060405234801561001057600080fd5b50610134806100206000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c80634af63f0214602d575b600080fd5b60cf60048036036040811015604157600080fd5b810190602081018135640100000000811115605b57600080fd5b820183602082011115606c57600080fd5b80359060200191846001830284011164010000000083111715608d57600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600092019190915250929550509135925060eb915050565b604080516001600160a01b039092168252519081900360200190f35b6000818351602085016000f5939250505056fea26469706673582212206b44f8a82cb6b156bfcc3dc6aadd6df4eefd204bc928a4397fd15dacf6d5320564736f6c634300060200331b83247000822470"]).await.unwrap();
let deploy_tx: H256 = proxy_provider.request("eth_sendRawTransaction", ["0xf9016c8085174876e8008303c4d88080b90154608060405234801561001057600080fd5b50610134806100206000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c80634af63f0214602d575b600080fd5b60cf60048036036040811015604157600080fd5b810190602081018135640100000000811115605b57600080fd5b820183602082011115606c57600080fd5b80359060200191846001830284011164010000000083111715608d57600080fd5b91908080601f016020809104026020016040519081016040528093929190818152602001838380828437600092019190915250929550509135925060eb915050565b604080516001600160a01b039092168252519081900360200190f35b6000818351602085016000f5939250505056fea26469706673582212206b44f8a82cb6b156bfcc3dc6aadd6df4eefd204bc928a4397fd15dacf6d5320564736f6c634300060200331b83247000822470"]).await.unwrap();
assert_eq!(
deploy_tx,
"0x803351deb6d745e91545a6a3e1c0ea3e9a6a02a1a4193b70edfcd2f40f71a01c"