bug fixes

This commit is contained in:
Bryan Stitt 2022-08-10 21:29:50 +00:00
parent 94c33f2ca0
commit 6d92f33dc4
5 changed files with 60 additions and 40 deletions

@ -81,6 +81,7 @@
- [x] send getTransaction rpc requests to the private rpc tier
- [x] I'm hitting infura rate limits very quickly. I feel like that means something is very inefficient
- whenever blocks were slow, we started checking as fast as possible
- [ ] send logs to sentry
- [ ] cli tool for resetting api keys
- [ ] nice output when cargo doc is run
- [ ] if we request an old block, more servers can handle it than we currently use.

@ -221,6 +221,7 @@ impl Web3ProxyApp {
}
};
// TODO: this should be a broadcast channel
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -260,10 +261,10 @@ impl Web3ProxyApp {
private_rpcs,
http_client.clone(),
redis_pool.clone(),
// subscribing to new heads here won't work well
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None,
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
// TODO: subscribe to pending transactions on the private rpcs?
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
)
.await
@ -353,11 +354,15 @@ impl Web3ProxyApp {
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.as_ref(),
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
// TODO: do clients support binary messages?
let msg = Message::Text(
serde_json::to_string(&msg).expect("this should always be valid json"),
);
if response_sender.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?

@ -303,8 +303,11 @@ impl Web3Connection {
match block {
Ok(block) => {
{
let hash = block.hash.unwrap();
let num = block.number.unwrap();
// TODO: is this true? Block::default probably doesn't
let hash = block.hash.expect("blocks here should always have hashes");
let num = block
.number
.expect("blocks here should always have numbers");
let mut head_block = self.head_block.write();
@ -443,11 +446,12 @@ impl Web3Connection {
continue;
}
Ok(HandleResult::None) => {
// TODO: what should we do?
warn!("No handle for latest block from {}", self);
// TODO: what should we do?
}
Err(err) => {
warn!(?err, "Rate limited on latest block from {}", self);
warn!(?err, "Internal error on latest block from {}", self);
// TODO: what should we do? sleep? extra time?
}
}
@ -548,6 +552,8 @@ impl Web3Connection {
.send_async((pending_tx_id, self.clone()))
.await
.context("tx_id_sender")?;
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
}
warn!("subscription ended");

@ -31,7 +31,7 @@ use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
/// A collection of Web3Connections that are on the same block
/// A collection of Web3Connections that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
struct SyncedConnections {
@ -385,8 +385,6 @@ impl Web3Connections {
});
futures.push(flatten_handle(handle));
} else {
unimplemented!();
}
// setup the block funnel
@ -406,7 +404,15 @@ impl Web3Connections {
if futures.is_empty() {
// no transaction or block subscriptions.
todo!("every second, check that the provider is still connected");
// todo!("every second, check that the provider is still connected");?
let handle = task::Builder::default().name("noop").spawn(async move {
loop {
sleep(Duration::from_secs(600)).await;
}
});
futures.push(flatten_handle(handle));
}
if let Err(e) = try_join_all(futures).await {
@ -457,6 +463,8 @@ impl Web3Connections {
// but before we do any queries, be sure the requested block num exists
let head_block_num = self.head_block_num();
if num > &head_block_num {
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
// TODO: instead of error, maybe just sleep and try again?
return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested",
head_block_num,
@ -466,8 +474,7 @@ impl Web3Connections {
// TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
let request =
json!({ "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
@ -581,8 +588,7 @@ impl Web3Connections {
panic!("i don't think this is possible")
}
/// TODO: possible dead lock here. investigate more. probably refactor
/// TODO: move parts of this onto SyncedConnections?
/// TODO: move parts of this onto SyncedConnections? it needs to be simpler
// we don't instrument here because we put a span inside the while loop
async fn update_synced_rpcs(
&self,
@ -595,19 +601,14 @@ impl Web3Connections {
// TODO: rpc name instead of url (will do this with config reload revamp)
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads automatically
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
if let Some(current_block) = connection_heads.get(&rpc.url) {
if current_block.hash == new_block.hash {
// duplicate block
continue;
}
}
let new_block_hash = if let Some(hash) = new_block.hash {
hash
} else {
// TODO: i think this should just be debug, but maybe it is a warning
warn!(%rpc, ?new_block, "Block without hash!");
connection_heads.remove(&rpc.url);
@ -640,7 +641,6 @@ impl Web3Connections {
connection_heads.remove(&rpc.url);
} else {
// TODO: no clone? we end up with different blocks for every rpc
connection_heads.insert(rpc.url.clone(), new_block.clone());
self.chain.add_block(new_block.clone(), false);
@ -651,7 +651,9 @@ impl Web3Connections {
.values()
.min_by(|a, b| a.number.cmp(&b.number))
{
lowest_block.number.unwrap()
lowest_block
.number
.expect("all blocks here should have a number")
} else {
continue;
};
@ -723,7 +725,7 @@ impl Web3Connections {
// TODO: there are sortable traits, but this seems simpler
/// sort the blocks in descending height
fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) {
trace!(?self.block, ?self.conns);
// trace!(?self.block, ?self.conns);
// first we care about the block number
let block_num = self.block.number.as_ref().unwrap();
@ -775,8 +777,8 @@ impl Web3Connections {
})
}
})
// sort b to a for descending order. sort a to b for ascending order
.max_by(|a, b| b.sortable_values().cmp(&a.sortable_values()))
// sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart
.max_by(|a, b| a.sortable_values().cmp(&b.sortable_values()))
{
let best_head_num = x.block.number.unwrap();
let best_head_hash = x.block.hash.unwrap();
@ -832,8 +834,8 @@ impl Web3Connections {
"{}/{} rpcs at {} ({}). head at {:?}",
pending_synced_connections.conns.len(),
self.conns.len(),
pending_synced_connections.head_block_hash,
pending_synced_connections.head_block_num,
pending_synced_connections.head_block_hash,
pending_synced_connections
.conns
.iter()
@ -859,8 +861,8 @@ impl Web3Connections {
trace!(
"all {} rpcs at {} ({})",
num_best_rpcs,
pending_synced_connections.head_block_hash,
pending_synced_connections.head_block_num,
pending_synced_connections.head_block_hash,
);
} else {
trace!(
@ -868,27 +870,26 @@ impl Web3Connections {
"{}/{} rpcs at {} ({})",
num_best_rpcs,
self.conns.len(),
pending_synced_connections.head_block_hash,
pending_synced_connections.head_block_num,
pending_synced_connections.head_block_hash,
);
}
// TODO: do this before or after processing all the transactions in this block?
// TODO: only swap if there is a change
trace!(?pending_synced_connections, "swapping");
// TODO: only swap if there is a change?
debug!(?pending_synced_connections, "swapping");
self.synced_connections
.swap(Arc::new(pending_synced_connections));
if new_head_block {
// TODO: this will need a refactor to only send once a minmum threshold has this block
// TODO: move this onto self.chain
// TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail
// TODO: is new_head_block accurate?
// TODO: move this onto self.chain?
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
}
} else {
// TODO: this expected when we first start
// TODO: is this expected when we first start?
// TODO: make sure self.synced_connections is empty
warn!("not enough rpcs in sync");
}

@ -9,7 +9,7 @@ use tracing::warn;
#[derive(Clone, serde::Deserialize)]
pub struct JsonRpcRequest {
// TODO: skip jsonrpc entireley?
// pub jsonrpc: Box<RawValue>,
pub jsonrpc: Box<RawValue>,
/// id could be a stricter type, but many rpcs do things against the spec
pub id: Box<RawValue>,
pub method: String,
@ -78,6 +78,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
A: MapAccess<'de>,
{
// TODO: i feel like this should be easier
let mut jsonrpc = None;
let mut id = None;
let mut method = None;
let mut params = None;
@ -88,7 +89,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
// throw away the value
// TODO: should we check that it's 2.0?
// TODO: how do we skip over this value entirely?
let _: String = map.next_value()?;
jsonrpc = Some(map.next_value()?);
}
Field::Id => {
if id.is_some() {
@ -111,6 +112,7 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
}
}
let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?;
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
@ -119,7 +121,12 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
Some(x) => Some(x),
};
let single = JsonRpcRequest { id, method, params };
let single = JsonRpcRequest {
jsonrpc,
id,
method,
params,
};
Ok(JsonRpcRequestEnum::Single(single))
}