diff --git a/config/example.toml b/config/example.toml index 8227635f..d72147ba 100644 --- a/config/example.toml +++ b/config/example.toml @@ -11,6 +11,8 @@ db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" # read-only replica useful when running the proxy in multiple regions db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy" +kafka_urls = "127.0.0.1:19092" + # thundering herd protection # only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit min_sum_soft_limit = 2_000 diff --git a/docker-compose.yml b/docker-compose.yml index 5d9360d4..beda587d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,18 @@ services: - ./data/dev_influxdb/data:/var/lib/influxdb2 - ./data/dev_influxdb/config:/etc/influxdb2 + dev-kafka: + image: bitnami/kafka:3.4 + ports: + - "127.0.0.1:19092:9092" + volumes: + - "./data/dev_kafka:/bitnami" + environment: + - KAFKA_CFG_ZOOKEEPER_CONNECT=dev-zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - dev-zookeeper + # volatile redis for storing rate limits dev-vredis: extends: @@ -46,3 +58,13 @@ services: service: volatile_redis ports: - 127.0.0.1:16379:6379 + + # TODO: kafka doesn't need zookeeper anymore, but all the docs still use it + dev-zookeeper: + image: bitnami/zookeeper:3.8 + ports: + - "127.0.0.1:12181:2181" + volumes: + - "./data/zookeeper:/bitnami" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes diff --git a/scripts/manual-tests/16-change-user-tier.sh b/scripts/manual-tests/16-change-user-tier.sh index 2505935b..8012cb9b 100644 --- a/scripts/manual-tests/16-change-user-tier.sh +++ b/scripts/manual-tests/16-change-user-tier.sh @@ -13,7 +13,9 @@ RUSTFLAGS="--cfg tokio_unstable" cargo run create_user --address 0x077e43dcca20d RUSTFLAGS="--cfg tokio_unstable" cargo run --release -- proxyd # Check if the instance is running -curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544 +curl --verbose -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544 + +curl --verbose -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params": ["latest", false], "id":1}' 127.0.0.1:8544 # Open this website to get the nonce to log in curl -X GET "http://127.0.0.1:8544/user/login/0xeB3E928A2E54BE013EF8241d4C9EaF4DfAE94D5a" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a9cbaacd..6278b836 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -474,10 +474,6 @@ impl Web3ProxyApp { Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err), } } - #[cfg(not(rdkafka))] - if top_config.app.kafka_urls.is_some() { - warn!("rdkafka rust feature is not enabled!"); - } // TODO: do this during apply_config so that we can change redis url while running // create a connection pool for redis @@ -1085,61 +1081,64 @@ impl Web3ProxyApp { let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); let mut kafka_stuff = None; - if let Some(kafka_producer) = self.kafka_producer.clone() { - let kafka_topic = "proxy_cached_request".to_string(); - let rpc_secret_key_id = authorization - .checks - .rpc_secret_key_id - .map(|x| x.get()) - .unwrap_or_default(); + if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) { + if let Some(kafka_producer) = self.kafka_producer.clone() { + let kafka_topic = "proxy_cached_request".to_string(); - let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?; + let rpc_secret_key_id = authorization + .checks + .rpc_secret_key_id + .map(|x| x.get()) + .unwrap_or_default(); - let request_bytes = rmp_serde::to_vec(&request)?; + let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?; - let request_hash = Some(keccak256(&request_bytes)); + let request_bytes = rmp_serde::to_vec(&request)?; - let chain_id = self.config.chain_id; + let request_hash = Some(keccak256(&request_bytes)); - // another item is added with the response, so initial_capacity is +1 what is needed here - let kafka_headers = OwnedHeaders::new_with_capacity(4) - .insert(Header { - key: "request_hash", - value: request_hash.as_ref(), - }) - .insert(Header { - key: "head_block_num", - value: head_block_num.map(|x| x.to_string()).as_ref(), - }) - .insert(Header { - key: "chain_id", - value: Some(&chain_id.to_le_bytes()), - }); + let chain_id = self.config.chain_id; - // save the key and headers for when we log the response - kafka_stuff = Some(( - kafka_topic.clone(), - kafka_key.clone(), - kafka_headers.clone(), - )); + // another item is added with the response, so initial_capacity is +1 what is needed here + let kafka_headers = OwnedHeaders::new_with_capacity(4) + .insert(Header { + key: "request_hash", + value: request_hash.as_ref(), + }) + .insert(Header { + key: "head_block_num", + value: head_block_num.map(|x| x.to_string()).as_ref(), + }) + .insert(Header { + key: "chain_id", + value: Some(&chain_id.to_le_bytes()), + }); - let f = async move { - let produce_future = kafka_producer.send( - FutureRecord::to(&kafka_topic) - .key(&kafka_key) - .payload(&request_bytes) - .headers(kafka_headers), - Duration::from_secs(0), - ); + // save the key and headers for when we log the response + kafka_stuff = Some(( + kafka_topic.clone(), + kafka_key.clone(), + kafka_headers.clone(), + )); - if let Err((err, msg)) = produce_future.await { - error!("produce kafka request log: {}. {:#?}", err, msg); - // TODO: re-queue the msg? - } - }; + let f = async move { + let produce_future = kafka_producer.send( + FutureRecord::to(&kafka_topic) + .key(&kafka_key) + .payload(&request_bytes) + .headers(kafka_headers), + Duration::from_secs(0), + ); - tokio::spawn(f); + if let Err((err, _)) = produce_future.await { + error!("produce kafka request log: {}", err); + // TODO: re-queue the msg? + } + }; + + tokio::spawn(f); + } } // save the id so we can attach it to the response @@ -1767,8 +1766,8 @@ impl Web3ProxyApp { Duration::from_secs(0), ); - if let Err((err, msg)) = produce_future.await { - error!("produce kafka request log: {}. {:#?}", err, msg); + if let Err((err, _)) = produce_future.await { + error!("produce kafka response log: {}", err); } };