more context. don't use unix timestamps

This commit is contained in:
Bryan Stitt 2022-10-20 23:50:23 +00:00
parent 94f205900a
commit b7259192e8
5 changed files with 39 additions and 14 deletions

@ -190,6 +190,7 @@ These are roughly in order of completition
- https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs
- we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop
- need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true
- [x] don't use unix timestamps for response_millis since leap seconds will confuse it
- [-] ability to domain lock or ip lock said key
- the code to check the database and use these entries already exists, but users don't have a way to set them
- [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed):
@ -216,7 +217,6 @@ These are roughly in order of completition
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
- [ ] don't use unix timestamps for response_millis since leap seconds will confuse it
- [ ] WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })
- ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
- why is it failing to get the block from params when its set to None? That should be the simple case

@ -1071,7 +1071,10 @@ impl Web3ProxyApp {
&response,
);
stat_sender.send_async(response_stat.into()).await?;
stat_sender
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
}
return Ok(response);
@ -1087,7 +1090,10 @@ impl Web3ProxyApp {
let response_stat =
ProxyResponseStat::new(request.method, authorized_key, request_metadata, &response);
stat_sender.send_async(response_stat.into()).await?;
stat_sender
.send_async(response_stat.into())
.await
.context("stat_sender sending response stat")?;
}
Ok(response)

@ -16,7 +16,7 @@ use std::thread;
use tokio::runtime;
use tokio::sync::broadcast;
use tokio::time::Duration;
use tracing::{debug, info};
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::config::{CliConfig, TopConfig};
@ -118,7 +118,9 @@ fn run(
};
// one of the handles stopped. send a value so the others know to shut down
shutdown_sender.send(())?;
if let Err(err) = shutdown_sender.send(()) {
warn!(?err, "shutdown sender");
};
// wait on all the important background tasks (like saving stats to the database) to complete
while let Some(x) = important_background_handles.next().await {

@ -54,9 +54,11 @@ pub struct AuthorizedKey {
pub log_revert_chance: Decimal,
}
#[derive(Debug, Default, Serialize)]
#[derive(Debug)]
pub struct RequestMetadata {
pub datetime: chrono::DateTime<Utc>,
pub start_datetime: chrono::DateTime<Utc>,
pub start_instant: tokio::time::Instant,
// TODO: better name for this
pub period_seconds: u64,
pub request_bytes: u64,
/// if this is 0, there was a cache_hit
@ -86,10 +88,15 @@ impl RequestMetadata {
.try_into()?;
let new = Self {
start_instant: Instant::now(),
start_datetime: Utc::now(),
period_seconds,
request_bytes,
datetime: Utc::now(),
..Default::default()
backend_requests: 0.into(),
no_servers: 0.into(),
error_response: false.into(),
response_bytes: 0.into(),
response_millis: 0.into(),
};
Ok(new)

@ -452,7 +452,9 @@ impl Web3Connections {
self.save_block(&consensus_head_block, true).await?;
head_block_sender.send(consensus_head_block)?;
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Some(old_block_id) => {
// TODO: do this log item better
@ -473,9 +475,13 @@ impl Web3Connections {
debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain");
self.save_block(&consensus_head_block, true).await?;
self.save_block(&consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
head_block_sender.send(consensus_head_block)?;
head_block_sender.send(consensus_head_block).context(
"head_block_sender sending consensus_head_block",
)?;
}
}
Ordering::Less => {
@ -484,9 +490,13 @@ impl Web3Connections {
warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block, true).await?;
self.save_block(&consensus_head_block, true).await.context(
"save_block sending consensus_head_block as heaviest chain",
)?;
head_block_sender.send(consensus_head_block)?;
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);