Compare commits

...

11 Commits
patch ... main

Author SHA1 Message Date
9555e15352
Settings 2024-08-09 19:48:43 +00:00
75ef474167
Listen websocket on same port, allow direct connections and more helpful logging 2024-08-09 19:48:33 +00:00
cody-wang-cb
30560d3d8c
making heavy logs to debug mode (#40) 2024-08-06 10:23:25 -05:00
cody-wang-cb
ba221ab80f
Add healthcheck ordering to simple failover mode (#38)
* add healthcheck to simple failover

* update comment

* update comment

* add another test
2024-08-01 16:17:25 -07:00
cody-wang-cb
0fb094feb4
Move base rate rate limit check inside handleBatchRPC() (#37)
* move base rate rate limit check

* fix comment
2024-07-31 13:25:53 -07:00
Jacob Elias
ec84826c4c
fix: allow multicall channel to handle multiple items to avoid blocking after inital response is returned. Improve metrics reporting (#35) 2024-07-30 13:28:59 -05:00
Jacob Elias
ef202f9167
feat: add docker registry image tagging to release workflow (#34) 2024-07-24 12:55:35 -05:00
Jacob Elias
d4382bfa19
feat: add multi call routing strategy (#26)
* feat: add multicall routing strategy
    * Updated proxyd config to accept a routing strategy parameter 
    * Added multicall routing strategies
    * Refactored backendGroup.Forward to handle multiple types of routing strategies
    * Background.ForwardToBackend now will return results via a channel
2024-07-22 16:44:27 -05:00
Jacob Elias
ec496f559b
fix: circle ci improvements to run release only on tag filtering, and builds based on path filtering (#23) 2024-06-28 16:37:10 -07:00
Jacob Elias
d7aab1df42
feat: add logging improvements to ci pipeline (#19) 2024-06-27 12:44:06 -05:00
Jacob Elias
1babca422e
feat: Improve updateBackend error handling and node banning logic (#15)
- Return early on network errors or invalid responses within updateBackend (e.g., zero block height)
- Prevent premature node banning
- Use existing sliding windows for node banning for unexpected tags or excessive network errors
2024-06-26 15:54:53 -05:00
29 changed files with 2430 additions and 195 deletions

@ -4,7 +4,7 @@ version: 2.1
setup: true setup: true
orbs: orbs:
path-filtering: circleci/path-filtering@0.1.1 path-filtering: circleci/path-filtering@1.0.0
workflows: workflows:
check-updated-files: check-updated-files:
@ -20,6 +20,9 @@ workflows:
proxyd/.* run-build-proxyd true proxyd/.* run-build-proxyd true
.circleci/.* run-all true .circleci/.* run-all true
.github/.* run-all true .github/.* run-all true
filters:
tags:
only: /.*/
base-revision: main base-revision: main
# this is the path of the configuration we should trigger once # this is the path of the configuration we should trigger once

@ -4,7 +4,7 @@ orbs:
go: circleci/go@1.9.0 go: circleci/go@1.9.0
gcp-cli: circleci/gcp-cli@2.4.1 gcp-cli: circleci/gcp-cli@2.4.1
shellcheck: circleci/shellcheck@3.1.2 shellcheck: circleci/shellcheck@3.1.2
path-filtering: circleci/path-filtering@0.1.1 path-filtering: circleci/path-filtering@1.0.0
parameters: parameters:
run-build-op-conductor-mon: run-build-op-conductor-mon:
@ -64,6 +64,47 @@ commands:
jobs: jobs:
log-config-results:
docker:
- image: us-docker.pkg.dev/oplabs-tools-artifacts/images/ci-builder:latest # only used to enable codecov.
environment:
CURRENT_TAG: << pipeline.git.tag >>
steps:
- checkout
- run:
name: Log Configuration Results
command: |
echo "Configuration Results:"
echo "run-build-op-conductor-mon: << pipeline.parameters.run-build-op-conductor-mon >>"
echo "run-build-op-ufm: << pipeline.parameters.run-build-op-ufm >>"
echo "run-build-proxyd: << pipeline.parameters.run-build-proxyd >>"
echo "run-all: << pipeline.parameters.run-all >>"
echo ""
echo "Pipeline Trigger Information:"
echo "pipeline.trigger_source: << pipeline.trigger_source >>"
echo "Is not a scheduled pipeline: $([ "<< pipeline.trigger_source >>" != "scheduled_pipeline" ] && echo "true" || echo "false")"
echo ""
echo "Tag Information:"
echo "Current tag: $CURRENT_TAG"
# Use the same regex patterns as defined in the YAML anchors
if [[ $CURRENT_TAG =~ ^proxyd/v.* ]]; then
echo "proxyd tag regex match: true"
else
echo "proxyd tag regex match: false"
fi
if [[ $CURRENT_TAG =~ ^op-conductor-mon/v.* ]]; then
echo "op-conductor-mon tag regex match: true"
else
echo "op-conductor-mon tag regex match: false"
fi
if [[ $CURRENT_TAG =~ ^op-ufm/v.* ]]; then
echo "op-ufm tag regex match: true"
else
echo "op-ufm tag regex match: false"
fi
docker-build: docker-build:
environment: environment:
DOCKER_BUILDKIT: 1 DOCKER_BUILDKIT: 1
@ -206,7 +247,7 @@ jobs:
name: Tag name: Tag
command: | command: |
gcloud auth configure-docker <<parameters.registry>> gcloud auth configure-docker <<parameters.registry>>
./ops/scripts/ci-docker-tag-op-stack-release.sh <<parameters.registry>>/<<parameters.repo>> $CIRCLE_TAG $CIRCLE_SHA1 ./ops/ci-tag-docker-release/ci-docker-tag-op-stack-release.sh <<parameters.registry>>/<<parameters.repo>> $CIRCLE_TAG $CIRCLE_SHA1
go-lint: go-lint:
parameters: parameters:
@ -343,6 +384,14 @@ jobs:
ghr -t "$GITHUB_TOKEN" -u "$CIRCLE_PROJECT_USERNAME" -r "$CIRCLE_PROJECT_REPONAME" -c "$CIRCLE_SHA1" -delete "$CIRCLE_TAG" <<parameters.package_name>>/<<parameters.artifact_path>> ghr -t "$GITHUB_TOKEN" -u "$CIRCLE_PROJECT_USERNAME" -r "$CIRCLE_PROJECT_REPONAME" -c "$CIRCLE_SHA1" -delete "$CIRCLE_TAG" <<parameters.package_name>>/<<parameters.artifact_path>>
workflows: workflows:
logging:
jobs:
- log-config-results:
filters:
tags:
only: /.*/
branches:
ignore: /.*/
op-conductor-mon: op-conductor-mon:
when: when:
or: [<< pipeline.parameters.run-build-op-conductor-mon >>, << pipeline.parameters.run-all >>] or: [<< pipeline.parameters.run-build-op-conductor-mon >>, << pipeline.parameters.run-all >>]
@ -392,10 +441,13 @@ workflows:
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>> docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
docker_context: . docker_context: .
release: release:
when:
not:
equal: [ scheduled_pipeline, << pipeline.trigger_source >> ]
jobs: jobs:
- log-config-results:
filters:
tags:
only: /^(proxyd|ufm-[a-z0-9\-]*|op-[a-z0-9\-]*)\/v.*/
branches:
ignore: /.*/
- hold: - hold:
type: approval type: approval
filters: filters:
@ -408,8 +460,6 @@ workflows:
filters: filters:
tags: tags:
only: /^op-ufm\/v.*/ only: /^op-ufm\/v.*/
branches:
ignore: /.*/
docker_name: op-ufm docker_name: op-ufm
docker_tags: <<pipeline.git.revision>> docker_tags: <<pipeline.git.revision>>
docker_context: . docker_context: .
@ -423,17 +473,25 @@ workflows:
docker_name: op-ufm docker_name: op-ufm
docker_tags: <<pipeline.git.revision>> docker_tags: <<pipeline.git.revision>>
context: context:
- oplabs-gcr - oplabs-gcr-release
requires: requires:
- op-ufm-docker-build - op-ufm-docker-build
- docker-tag-op-stack-release:
name: docker-tag-op-ufm-release
filters:
tags:
only: /^op-ufm\/v.*/
branches:
ignore: /.*/
context:
- oplabs-gcr-release
requires:
- op-ufm-docker-publish
- docker-build: - docker-build:
name: proxyd-docker-build name: proxyd-docker-build
filters: filters:
tags: tags:
only: /^proxyd\/v.*/ only: /^proxyd\/v.*/
branches:
ignore: /.*/
docker_name: proxyd docker_name: proxyd
docker_tags: <<pipeline.git.revision>> docker_tags: <<pipeline.git.revision>>
docker_context: . docker_context: .
@ -447,26 +505,52 @@ workflows:
filters: filters:
tags: tags:
only: /^proxyd\/v.*/ only: /^proxyd\/v.*/
branches:
ignore: /.*/
docker_name: proxyd docker_name: proxyd
docker_tags: <<pipeline.git.revision>> docker_tags: <<pipeline.git.revision>>
context: context:
- oplabs-gcr-release - oplabs-gcr-release
requires: requires:
- proxyd-docker-build - proxyd-docker-build
- docker-tag-op-stack-release:
name: docker-tag-op-stack-release
filters:
tags:
only: /^proxyd\/v.*/
branches:
ignore: /.*/
context:
- oplabs-gcr-release
requires:
- proxyd-docker-release
- docker-build: - docker-build:
name: op-conductor-mon-docker-build name: op-conductor-mon-docker-build
filters:
tags:
only: /^op-conductor-mon\/v.*/
docker_file: op-conductor-mon/Dockerfile docker_file: op-conductor-mon/Dockerfile
docker_name: op-conductor-mon docker_name: op-conductor-mon
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>> docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
docker_context: . docker_context: .
context:
- oplabs-gcr-release
requires:
- hold
- docker-publish: - docker-publish:
name: op-conductor-mon-docker-publish name: op-conductor-mon-docker-publish
docker_name: op-conductor-mon docker_name: op-conductor-mon
docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>> docker_tags: <<pipeline.git.revision>>,<<pipeline.git.branch>>
context: context:
- oplabs-gcr - oplabs-gcr-release
requires: requires:
- op-conductor-mon-docker-build - op-conductor-mon-docker-build
- docker-tag-op-stack-release:
name: docker-tag-op-stack-release
filters:
tags:
only: /^op-conductor-mon\/v.*/
branches:
ignore: /.*/
context:
- oplabs-gcr-release
requires:
- op-conductor-mon-docker-publish

@ -0,0 +1,45 @@
#!/usr/bin/env bash
set -euo pipefail
DOCKER_REPO=$1
GIT_TAG=$2
GIT_SHA=$3
IMAGE_NAME=$(echo "$GIT_TAG" | grep -Eow '^(proxyd|ufm-[a-z0-9\-]*|op-[a-z0-9\-]*)' || true)
if [ -z "$IMAGE_NAME" ]; then
echo "image name could not be parsed from git tag '$GIT_TAG'"
exit 1
fi
IMAGE_TAG=$(echo "$GIT_TAG" | grep -Eow 'v.*' || true)
if [ -z "$IMAGE_TAG" ]; then
echo "image tag could not be parsed from git tag '$GIT_TAG'"
exit 1
fi
SOURCE_IMAGE_TAG="$DOCKER_REPO/$IMAGE_NAME:$GIT_SHA"
TARGET_IMAGE_TAG="$DOCKER_REPO/$IMAGE_NAME:$IMAGE_TAG"
TARGET_IMAGE_TAG_LATEST="$DOCKER_REPO/$IMAGE_NAME:latest"
echo "Checking if docker images exist for '$IMAGE_NAME'"
echo ""
tags=$(gcloud container images list-tags "$DOCKER_REPO/$IMAGE_NAME" --limit 1 --format json)
if [ "$tags" = "[]" ]; then
echo "No existing docker images were found for '$IMAGE_NAME'. The code tagged with '$GIT_TAG' may not have an associated dockerfile or docker build job."
echo "If this service has a dockerfile, add a docker-publish job for it in the circleci config."
echo ""
echo "Exiting"
exit 0
fi
echo "Tagging $SOURCE_IMAGE_TAG with '$IMAGE_TAG'"
gcloud container images add-tag -q "$SOURCE_IMAGE_TAG" "$TARGET_IMAGE_TAG"
# Do not tag with latest if the release is a release candidate.
if [[ "$IMAGE_TAG" == *"rc"* ]]; then
echo "Not tagging with 'latest' because the release is a release candidate."
exit 0
fi
echo "Tagging $SOURCE_IMAGE_TAG with 'latest'"
gcloud container images add-tag -q "$SOURCE_IMAGE_TAG" "$TARGET_IMAGE_TAG_LATEST"

@ -6,7 +6,7 @@ ARG GITVERSION=docker
RUN apk add make jq git gcc musl-dev linux-headers RUN apk add make jq git gcc musl-dev linux-headers
COPY ./proxyd /app COPY . /app
WORKDIR /app WORKDIR /app
@ -16,7 +16,7 @@ FROM alpine:3.18
RUN apk add bind-tools jq curl bash git redis RUN apk add bind-tools jq curl bash git redis
COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh COPY ./entrypoint.sh /bin/entrypoint.sh
RUN apk update && \ RUN apk update && \
apk add ca-certificates && \ apk add ca-certificates && \
@ -24,9 +24,11 @@ RUN apk update && \
EXPOSE 8080 EXPOSE 8080
VOLUME /etc/proxyd # VOLUME /etc/proxyd
ADD ./docker.toml /proxyd.toml
COPY --from=builder /app/bin/proxyd /bin/proxyd COPY --from=builder /app/bin/proxyd /bin/proxyd
ENTRYPOINT ["/bin/entrypoint.sh"] ENTRYPOINT ["/bin/entrypoint.sh"]
CMD ["/bin/proxyd", "/etc/proxyd/proxyd.toml"] CMD ["/bin/proxyd", "/proxyd.toml"]

@ -20,6 +20,6 @@ lint:
go vet ./... go vet ./...
.PHONY: test .PHONY: test
test-fallback: test-%:
go test -v ./... -test.run ^TestFallback$ go test -v ./... -test.run ^Test$*$$
.PHONY: test-fallback .PHONY: test-%

@ -160,7 +160,7 @@ type Backend struct {
latencySlidingWindow *sw.AvgSlidingWindow latencySlidingWindow *sw.AvgSlidingWindow
networkRequestsSlidingWindow *sw.AvgSlidingWindow networkRequestsSlidingWindow *sw.AvgSlidingWindow
networkErrorsSlidingWindow *sw.AvgSlidingWindow intermittentErrorsSlidingWindow *sw.AvgSlidingWindow
weight int weight int
} }
@ -279,6 +279,12 @@ func WithConsensusReceiptTarget(receiptsTarget string) BackendOpt {
} }
} }
func WithIntermittentNetworkErrorSlidingWindow(sw *sw.AvgSlidingWindow) BackendOpt {
return func(b *Backend) {
b.intermittentErrorsSlidingWindow = sw
}
}
type indexedReqRes struct { type indexedReqRes struct {
index int index int
req *RPCReq req *RPCReq
@ -330,7 +336,7 @@ func NewBackend(
latencySlidingWindow: sw.NewSlidingWindow(), latencySlidingWindow: sw.NewSlidingWindow(),
networkRequestsSlidingWindow: sw.NewSlidingWindow(), networkRequestsSlidingWindow: sw.NewSlidingWindow(),
networkErrorsSlidingWindow: sw.NewSlidingWindow(), intermittentErrorsSlidingWindow: sw.NewSlidingWindow(),
} }
backend.Override(opts...) backend.Override(opts...)
@ -366,6 +372,14 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
), ),
) )
log.Trace(
"forwarding request to backend",
"name", b.Name,
"req_id", GetReqID(ctx),
"attempt_count", i+1,
"max_attempts", b.maxRetries+1,
"method", metricLabelMethod,
)
res, err := b.doForward(ctx, reqs, isBatch) res, err := b.doForward(ctx, reqs, isBatch)
switch err { switch err {
case nil: // do nothing case nil: // do nothing
@ -375,6 +389,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
"name", b.Name, "name", b.Name,
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"max", b.maxResponseSize, "max", b.maxResponseSize,
"method", metricLabelMethod,
) )
RecordBatchRPCError(ctx, b.Name, reqs, err) RecordBatchRPCError(ctx, b.Name, reqs, err)
case ErrConsensusGetReceiptsCantBeBatched: case ErrConsensusGetReceiptsCantBeBatched:
@ -409,6 +424,9 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
"name", b.Name, "name", b.Name,
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"err", err, "err", err,
"method", metricLabelMethod,
"attempt_count", i+1,
"max_retries", b.maxRetries+1,
) )
timer.ObserveDuration() timer.ObserveDuration()
RecordBatchRPCError(ctx, b.Name, reqs, err) RecordBatchRPCError(ctx, b.Name, reqs, err)
@ -534,7 +552,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error creating backend request") return nil, wrapErr(err, "error creating backend request")
} }
@ -560,7 +578,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
start := time.Now() start := time.Now()
httpRes, err := b.client.DoLimited(httpReq) httpRes, err := b.client.DoLimited(httpReq)
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error in backend request") return nil, wrapErr(err, "error in backend request")
} }
@ -579,7 +597,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// Alchemy returns a 400 on bad JSONs, so handle that case // Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, fmt.Errorf("response code %d", httpRes.StatusCode) return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
} }
@ -590,7 +608,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
return nil, ErrBackendResponseTooLarge return nil, ErrBackendResponseTooLarge
} }
if err != nil { if err != nil {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
@ -608,18 +626,18 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
if err := json.Unmarshal(resB, &rpcRes); err != nil { if err := json.Unmarshal(resB, &rpcRes); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
} }
if len(rpcReqs) != len(rpcRes) { if len(rpcReqs) != len(rpcRes) {
b.networkErrorsSlidingWindow.Incr() b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
} }
@ -670,7 +688,7 @@ func (b *Backend) ErrorRate() (errorRate float64) {
// we only really start counting the error rate after a minimum of 10 requests // we only really start counting the error rate after a minimum of 10 requests
// this is to avoid false positives when the backend is just starting up // this is to avoid false positives when the backend is just starting up
if b.networkRequestsSlidingWindow.Sum() >= 10 { if b.networkRequestsSlidingWindow.Sum() >= 10 {
errorRate = b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum() errorRate = b.intermittentErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
} }
return errorRate return errorRate
} }
@ -710,6 +728,11 @@ type BackendGroup struct {
WeightedRouting bool WeightedRouting bool
Consensus *ConsensusPoller Consensus *ConsensusPoller
FallbackBackends map[string]bool FallbackBackends map[string]bool
routingStrategy RoutingStrategy
}
func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy {
return bg.routingStrategy
} }
func (bg *BackendGroup) Fallbacks() []*Backend { func (bg *BackendGroup) Fallbacks() []*Backend {
@ -744,113 +767,178 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
overriddenResponses := make([]*indexedReqRes, 0) overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
if bg.Consensus != nil { // When routing_strategy is set to `consensus_aware` the backend group acts as a load balancer
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group // serving traffic from any backend that agrees in the consensus group
// We also rewrite block tags to enforce compliance with consensus // We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{ if bg.Consensus != nil {
latest: bg.Consensus.GetLatestBlockNumber(), rpcReqs, overriddenResponses = bg.OverwriteConsensusResponses(rpcReqs, overriddenResponses, rewrittenReqs)
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
} }
for i, req := range rpcReqs { // When routing_strategy is set to 'multicall' the request will be forward to all backends
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} // and return the first successful response
result, err := RewriteTags(rctx, req, &res) if bg.GetRoutingStrategy() == MulticallRoutingStrategy && isValidMulticallTx(rpcReqs) && !isBatch {
switch result { backendResp := bg.ExecuteMulticall(ctx, rpcReqs)
case RewriteOverrideError: return backendResp.RPCRes, backendResp.ServedBy, backendResp.error
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
rpcReqs = rewrittenReqs
} }
rpcRequestsTotal.Inc() rpcRequestsTotal.Inc()
for _, back := range backends { ch := make(chan BackendGroupRPCResponse)
res := make([]*RPCRes, 0) go func() {
var err error defer close(ch)
backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, backends, ctx, isBatch)
ch <- *backendResp
}()
backendResp := <-ch
servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name) if backendResp.error != nil {
log.Error("error serving requests",
if len(rpcReqs) > 0 {
res, err = back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
errors.Is(err, ErrMethodNotWhitelisted) {
return nil, "", err
}
if errors.Is(err, ErrBackendResponseTooLarge) {
return nil, servedBy, err
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
"skipping offline backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if errors.Is(err, ErrBackendOverCapacity) {
log.Warn(
"skipping over-capacity backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if err != nil {
log.Error(
"error forwarding request to backend",
"name", back.Name,
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx), "auth", GetAuthCtx(ctx),
"err", err, "err", backendResp.error,
) )
continue return backendResp.RPCRes, backendResp.ServedBy, backendResp.error
}
} }
// re-apply overridden responses // re-apply overridden responses
for _, ov := range overriddenResponses { log.Trace("successfully served request overriding responses",
if len(res) > 0 { "req_id", GetReqID(ctx),
// insert ov.res at position ov.index "auth", GetAuthCtx(ctx),
res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...) )
res := OverrideResponses(backendResp.RPCRes, overriddenResponses)
return res, backendResp.ServedBy, backendResp.error
}
func isValidMulticallTx(rpcReqs []*RPCReq) bool {
if len(rpcReqs) == 1 {
if rpcReqs[0].Method == "eth_sendRawTransaction" {
return true
}
}
return false
}
// Using special struct since servedBy may not be populated if error occurs
type multicallTuple struct {
response *BackendGroupRPCResponse
backendName string
}
func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq) *BackendGroupRPCResponse {
// Create ctx without cancel so background tasks process
// after original request returns
bgCtx := context.WithoutCancel(ctx)
log.Info("executing multicall routing strategy",
"req_id", GetReqID(bgCtx),
"auth", GetAuthCtx(bgCtx),
)
var wg sync.WaitGroup
ch := make(chan *multicallTuple, len(bg.Backends))
for _, backend := range bg.Backends {
wg.Add(1)
go bg.MulticallRequest(backend, rpcReqs, &wg, bgCtx, ch)
}
go func() {
wg.Wait()
log.Debug("closing multicall channel",
"req_id", GetReqID(bgCtx),
"auth", GetAuthCtx(bgCtx),
)
close(ch)
}()
return bg.ProcessMulticallResponses(ch, bgCtx)
}
func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg *sync.WaitGroup, ctx context.Context, ch chan *multicallTuple) {
defer wg.Done()
log.Debug("forwarding multicall request to backend",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
RecordBackendGroupMulticallRequest(bg, backend.Name)
backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, []*Backend{backend}, ctx, false)
multicallResp := &multicallTuple{
response: backendResp,
backendName: backend.Name,
}
log.Debug("placing multicall response into channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
ch <- multicallResp
log.Trace("placed multicall response into channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
if backendResp.error != nil {
log.Error("received multicall error response from backend",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
"error", backendResp.error.Error(),
)
RecordBackendGroupMulticallCompletion(bg, backend.Name, backendResp.error.Error())
} else { } else {
res = append(res, ov.res) RecordBackendGroupMulticallCompletion(bg, backend.Name, "nil")
}
}
func (bg *BackendGroup) ProcessMulticallResponses(ch chan *multicallTuple, ctx context.Context) *BackendGroupRPCResponse {
var finalResp *BackendGroupRPCResponse
i := 0
for {
multicallResp, ok := <-ch
if !ok {
log.Trace("multicall response channel closed",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"response_count", i,
)
if i > 0 {
return finalResp
}
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: errors.New("no multicall response received"),
} }
} }
return res, servedBy, nil i++
} resp := multicallResp.response
backendName := multicallResp.backendName
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP) if resp.error != nil {
return nil, "", ErrNoBackends log.Error("received error response from multicall channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", resp.error,
"backend", backendName,
)
finalResp = resp
continue
}
log.Info("received successful response from multicall channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"served_by", resp.ServedBy,
"backend", backendName,
)
return resp
}
} }
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
@ -901,13 +989,21 @@ func weightedShuffle(backends []*Backend) {
func (bg *BackendGroup) orderedBackendsForRequest() []*Backend { func (bg *BackendGroup) orderedBackendsForRequest() []*Backend {
if bg.Consensus != nil { if bg.Consensus != nil {
return bg.loadBalancedConsensusGroup() return bg.loadBalancedConsensusGroup()
} else if bg.WeightedRouting {
result := make([]*Backend, len(bg.Backends))
copy(result, bg.Backends)
weightedShuffle(result)
return result
} else { } else {
return bg.Backends healthy := make([]*Backend, 0, len(bg.Backends))
unhealthy := make([]*Backend, 0, len(bg.Backends))
for _, be := range bg.Backends {
if be.IsHealthy() {
healthy = append(healthy, be)
} else {
unhealthy = append(unhealthy, be)
}
}
if bg.WeightedRouting {
weightedShuffle(healthy)
weightedShuffle(unhealthy)
}
return append(healthy, unhealthy...)
} }
} }
@ -1232,7 +1328,7 @@ func RecordBatchRPCError(ctx context.Context, backendName string, reqs []*RPCReq
} }
func MaybeRecordErrorsInRPCRes(ctx context.Context, backendName string, reqs []*RPCReq, resBatch []*RPCRes) { func MaybeRecordErrorsInRPCRes(ctx context.Context, backendName string, reqs []*RPCReq, resBatch []*RPCRes) {
log.Info("forwarded RPC request", log.Debug("forwarded RPC request",
"backend", backendName, "backend", backendName,
"auth", GetAuthCtx(ctx), "auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
@ -1266,7 +1362,148 @@ func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCR
} }
} }
func (b *Backend) ClearSlidingWindows() {
b.intermittentErrorsSlidingWindow.Clear()
b.networkRequestsSlidingWindow.Clear()
}
func stripXFF(xff string) string { func stripXFF(xff string) string {
ipList := strings.Split(xff, ",") ipList := strings.Split(xff, ",")
return strings.TrimSpace(ipList[0]) return strings.TrimSpace(ipList[0])
} }
type BackendGroupRPCResponse struct {
RPCRes []*RPCRes
ServedBy string
error error
}
func (bg *BackendGroup) ForwardRequestToBackendGroup(
rpcReqs []*RPCReq,
backends []*Backend,
ctx context.Context,
isBatch bool,
) *BackendGroupRPCResponse {
for _, back := range backends {
res := make([]*RPCRes, 0)
var err error
servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name)
if len(rpcReqs) > 0 {
res, err = back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
errors.Is(err, ErrMethodNotWhitelisted) {
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: err,
}
}
if errors.Is(err, ErrBackendResponseTooLarge) {
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: err,
}
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
"skipping offline backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if errors.Is(err, ErrBackendOverCapacity) {
log.Warn(
"skipping over-capacity backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if err != nil {
log.Error(
"error forwarding request to backend",
"name", back.Name,
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", err,
)
continue
}
}
return &BackendGroupRPCResponse{
RPCRes: res,
ServedBy: servedBy,
error: nil,
}
}
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: ErrNoBackends,
}
}
func OverrideResponses(res []*RPCRes, overriddenResponses []*indexedReqRes) []*RPCRes {
for _, ov := range overriddenResponses {
if len(res) > 0 {
// insert ov.res at position ov.index
res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
} else {
res = append(res, ov.res)
}
}
return res
}
func (bg *BackendGroup) OverwriteConsensusResponses(rpcReqs []*RPCReq, overriddenResponses []*indexedReqRes, rewrittenReqs []*RPCReq) ([]*RPCReq, []*indexedReqRes) {
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
result, err := RewriteTags(rctx, req, &res)
switch result {
case RewriteOverrideError:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
return rewrittenReqs, overriddenResponses
}

@ -6,17 +6,23 @@ import (
"os" "os"
"strings" "strings"
"time" "time"
"github.com/ethereum/go-ethereum/log"
) )
type ServerConfig struct { type ServerConfig struct {
RPCHost string `toml:"rpc_host"` RPCHost string `toml:"rpc_host"`
RPCPort int `toml:"rpc_port"` RPCPort int `toml:"rpc_port"`
EnableWS bool `toml:"enable_ws"`
WSHost string `toml:"ws_host"` WSHost string `toml:"ws_host"`
WSPort int `toml:"ws_port"` WSPort int `toml:"ws_port"`
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"` MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
MaxConcurrentRPCs int64 `toml:"max_concurrent_rpcs"` MaxConcurrentRPCs int64 `toml:"max_concurrent_rpcs"`
LogLevel string `toml:"log_level"` LogLevel string `toml:"log_level"`
// Allow direct client connection without x_forwarded_for header for local tests
AllowDirect bool `toml:"allow_direct"`
// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections // TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
TimeoutSeconds int `toml:"timeout_seconds"` TimeoutSeconds int `toml:"timeout_seconds"`
@ -107,11 +113,54 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig
type RoutingStrategy string
func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool {
// If Consensus Aware is Set and Routing RoutingStrategy is populated fail
if b.ConsensusAware && b.RoutingStrategy != "" {
log.Warn("consensus_aware is now deprecated, please use routing_strategy = consensus_aware")
log.Crit("Exiting consensus_aware and routing strategy are mutually exclusive, they cannot both be defined")
}
// If Consensus Aware is Set set RoutingStrategy to consensus_aware
if b.ConsensusAware {
b.RoutingStrategy = ConsensusAwareRoutingStrategy
log.Info("consensus_aware is now deprecated, please use routing_strategy = consenus_aware in the future")
}
switch b.RoutingStrategy {
case ConsensusAwareRoutingStrategy:
return true
case MulticallRoutingStrategy:
return true
case FallbackRoutingStrategy:
return true
case "":
log.Info("Empty routing strategy provided for backend_group, using fallback strategy ", "name", bgName)
b.RoutingStrategy = FallbackRoutingStrategy
return true
default:
return false
}
}
const (
ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware"
MulticallRoutingStrategy RoutingStrategy = "multicall"
FallbackRoutingStrategy RoutingStrategy = "fallback"
)
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
WeightedRouting bool `toml:"weighted_routing"` WeightedRouting bool `toml:"weighted_routing"`
RoutingStrategy RoutingStrategy `toml:"routing_strategy"`
/*
Deprecated: Use routing_strategy config to create a consensus_aware proxyd instance
*/
ConsensusAware bool `toml:"consensus_aware"` ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusAsyncHandler string `toml:"consensus_handler"`
ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"` ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"`

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -63,6 +62,24 @@ func (bs *backendState) IsBanned() bool {
return time.Now().Before(bs.bannedUntil) return time.Now().Before(bs.bannedUntil)
} }
func (bs *backendState) GetLatestBlock() (hexutil.Uint64, string) {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.latestBlockNumber, bs.latestBlockHash
}
func (bs *backendState) GetSafeBlockNumber() hexutil.Uint64 {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.safeBlockNumber
}
func (bs *backendState) GetFinalizedBlockNumber() hexutil.Uint64 {
bs.backendStateMux.Lock()
defer bs.backendStateMux.Unlock()
return bs.finalizedBlockNumber
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus // GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock() defer cp.consensusGroupMux.Unlock()
@ -287,7 +304,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
// UpdateBackend refreshes the consensus state of a single backend // UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.getBackendState(be) bs := cp.GetBackendState(be)
RecordConsensusBackendBanned(be, bs.IsBanned()) RecordConsensusBackendBanned(be, bs.IsBanned())
if bs.IsBanned() { if bs.IsBanned() {
@ -306,6 +323,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendInSync(be, err == nil && inSync) RecordConsensusBackendInSync(be, err == nil && inSync)
if err != nil { if err != nil {
log.Warn("error updating backend sync state", "name", be.Name, "err", err) log.Warn("error updating backend sync state", "name", be.Name, "err", err)
return
} }
var peerCount uint64 var peerCount uint64
@ -313,23 +331,49 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
peerCount, err = cp.getPeerCount(ctx, be) peerCount, err = cp.getPeerCount(ctx, be)
if err != nil { if err != nil {
log.Warn("error updating backend peer count", "name", be.Name, "err", err) log.Warn("error updating backend peer count", "name", be.Name, "err", err)
return
}
if peerCount == 0 {
log.Warn("peer count responded with 200 and 0 peers", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
} }
RecordConsensusBackendPeerCount(be, peerCount) RecordConsensusBackendPeerCount(be, peerCount)
} }
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend - latest block", "name", be.Name, "err", err) log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err)
return
}
if latestBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
} }
safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil { if err != nil {
log.Warn("error updating backend - safe block", "name", be.Name, "err", err) log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err)
return
}
if safeBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
} }
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil { if err != nil {
log.Warn("error updating backend - finalized block", "name", be.Name, "err", err) log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err)
return
}
if finalizedBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
} }
RecordConsensusBackendUpdateDelay(be, bs.lastUpdate) RecordConsensusBackendUpdateDelay(be, bs.lastUpdate)
@ -523,6 +567,14 @@ func (cp *ConsensusPoller) IsBanned(be *Backend) bool {
return bs.IsBanned() return bs.IsBanned()
} }
// IsBanned checks if a specific backend is banned
func (cp *ConsensusPoller) BannedUntil(be *Backend) time.Time {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
return bs.bannedUntil
}
// Ban bans a specific backend // Ban bans a specific backend
func (cp *ConsensusPoller) Ban(be *Backend) { func (cp *ConsensusPoller) Ban(be *Backend) {
if be.forcedCandidate { if be.forcedCandidate {
@ -618,8 +670,8 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
// getBackendState creates a copy of backend state so that the caller can use it without locking // GetBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) getBackendState(be *Backend) *backendState { func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock() defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
@ -691,7 +743,7 @@ func (cp *ConsensusPoller) FilterCandidates(backends []*Backend) map[*Backend]*b
for _, be := range backends { for _, be := range backends {
bs := cp.getBackendState(be) bs := cp.GetBackendState(be)
if be.forcedCandidate { if be.forcedCandidate {
candidates[be] = bs candidates[be] = bs
continue continue

20
proxyd/docker-compose.yml Normal file

@ -0,0 +1,20 @@
services:
redis:
container_name: redis
image: redis:6.2-alpine
restart: always
#networks:
# - tornado_net
command: ["redis-server"]
proxyd:
container_name: proxyd
image: proxyd
restart: always
#networks:
# - tornado_net
environment:
- REDIS_URL=redis://redis:6379
- RPC_URL=
- WS_URL=
ports:
- '127.0.0.1:8544:8544'

171
proxyd/docker.toml Normal file

@ -0,0 +1,171 @@
ws_method_whitelist = [
"eth_subscribe",
"eth_unsubscribe",
"eth_blobBaseFee",
"eth_blockNumber",
"eth_call",
"eth_chainId",
"eth_estimateGas",
"eth_feeHistory",
"eth_gasPrice",
"eth_getAccount",
"eth_getBalance",
"eth_getBlockByHash",
"eth_getBlockByNumber",
"eth_getBlockReceipts",
"eth_getBlockTransactionCountByHash",
"eth_getBlockTransactionCountByNumber",
"eth_getCode",
"eth_getFilterChanges",
"eth_getFilterLogs",
"eth_getLogs",
"eth_getProof",
"eth_getStorageAt",
"eth_getTransactionByBlockHashAndIndex",
"eth_getTransactionByBlockNumberAndIndex",
"eth_getTransactionByHash",
"eth_getTransactionCount",
"eth_getTransactionReceipt",
"eth_getUncleCountByBlockHash",
"eth_getUncleCountByBlockNumber",
"eth_maxPriorityFeePerGas",
"eth_newBlockFilter",
"eth_newFilter",
"eth_newPendingTransactionFilter",
"eth_syncing",
"eth_uninstallFilter",
"eth_sendRawTransaction",
"net_version",
"web3_clientVersion",
"web3_sha3",
# tracers for archive nodes
"trace_block",
"trace_call",
"trace_callMany",
"trace_filter",
"trace_rawTransaction",
"trace_replayBlockTransactions",
"trace_replayTransaction",
"trace_transaction",
"debug_getBadBlocks",
"debug_storageRangeAt",
"debug_getTrieFlushInterval",
"debug_traceBlock",
"debug_traceBlockByHash",
"debug_traceBlockByNumber",
"debug_traceCall",
"debug_traceTransaction",
]
ws_backend_group = "main"
[rpc_method_mappings]
eth_blobBaseFee = "main"
eth_blockNumber = "main"
eth_call = "main"
eth_chainId = "main"
eth_estimateGas = "main"
eth_feeHistory = "main"
eth_gasPrice = "main"
eth_getAccount = "main"
eth_getBalance = "main"
eth_getBlockByHash = "main"
eth_getBlockByNumber = "main"
eth_getBlockReceipts = "main"
eth_getBlockTransactionCountByHash = "main"
eth_getBlockTransactionCountByNumber = "main"
eth_getCode = "main"
eth_getFilterChanges = "main"
eth_getFilterLogs = "main"
eth_getLogs = "main"
eth_getProof = "main"
eth_getStorageAt = "main"
eth_getTransactionByBlockHashAndIndex = "main"
eth_getTransactionByBlockNumberAndIndex = "main"
eth_getTransactionByHash = "main"
eth_getTransactionCount = "main"
eth_getTransactionReceipt = "main"
eth_getUncleCountByBlockHash = "main"
eth_getUncleCountByBlockNumber = "main"
eth_maxPriorityFeePerGas = "main"
eth_newBlockFilter = "main"
eth_newFilter = "main"
eth_newPendingTransactionFilter = "main"
eth_syncing = "main"
eth_uninstallFilter = "main"
eth_sendRawTransaction = "main"
net_version = "main"
web3_clientVersion = "main"
web3_sha3 = "main"
trace_block = "main"
trace_call = "main"
trace_callMany = "main"
trace_filter = "main"
trace_rawTransaction = "main"
trace_replayBlockTransactions = "main"
trace_replayTransaction = "main"
trace_transaction = "main"
debug_getBadBlocks = "main"
debug_storageRangeAt = "main"
debug_getTrieFlushInterval = "main"
debug_traceBlock = "main"
debug_traceBlockByHash = "main"
debug_traceBlockByNumber = "main"
debug_traceCall = "main"
debug_traceTransaction = "main"
[server]
rpc_host = "0.0.0.0"
rpc_port = 8544
enable_ws = true
max_body_size_bytes = 10485760
max_concurrent_rpcs = 1000
log_level = "info"
allow_direct = true
[cache]
enabled = true
ttl = "14s"
[redis]
url = "$REDIS_URL"
[metrics]
enabled = true
host = "0.0.0.0"
port = 9761
[rate_limit]
use_redis = true
base_rate = 2000
base_interval = "60s"
[backend]
response_timeout_seconds = 120
max_response_size_bytes = 5242880
max_retries = 3
out_of_service_seconds = 600
max_latency_threshold = "30s"
max_degraded_latency_threshold = "10s"
max_error_rate_threshold = 0.3
[backends]
[backends.main]
rpc_url = "$RPC_URL"
ws_url = "$WS_URL"
max_rps = 100
max_ws_conns = 100
consensus_skip_peer_count = true
consensus_receipts_target = "eth_getBlockReceipts"
[backend_groups]
[backend_groups.main]
backends = ["main"]
consensus_aware = false
# consensus_aware = true
consensus_ban_period = "1m"
consensus_max_update_threshold = "20s"
consensus_max_block_lag = 16
# Maximum block range (for eth_getLogs method), no default
# consensus_max_block_range = 20000
# Minimum peer count, default 3
# consensus_min_peer_count = 4

@ -0,0 +1,353 @@
package integration_tests
import (
"context"
"net/http"
"os"
"path"
"testing"
"time"
"github.com/ethereum-optimism/optimism/proxyd"
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/require"
)
type bhZeroNodeContext struct {
backend *proxyd.Backend // this is the actual backend impl in proxyd
mockBackend *MockBackend // this is the fake backend that we can use to mock responses
handler *ms.MockedHandler // this is where we control the state of mocked responses
intermittentNetErrorWindow *sw.AvgSlidingWindow
clock *sw.AdjustableClock // this is where we control backend time
}
// ts is a convenient method that must parse a time.Time from a string in format `"2006-01-02 15:04:05"`
func ts(s string) time.Time {
t, err := time.Parse(time.DateTime, s)
if err != nil {
panic(err)
}
return t
}
func setupBlockHeightZero(t *testing.T) (map[string]*bhZeroNodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func()) {
// setup mock servers
node1 := NewMockBackend(nil)
node2 := NewMockBackend(nil)
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/block_height_zero_and_network_errors_responses.yaml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler))
// setup proxyd
config := ReadConfig("block_height_zero_and_network_errors")
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545")
// expose the backend group
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus, "Expected Consenus Poller to be intialized")
require.Equal(t, 2, len(bg.Backends))
// convenient mapping to access the nodes
nodes := map[string]*bhZeroNodeContext{
"node1": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
}
return nodes, bg, client, shutdown
}
func TestBlockHeightZero(t *testing.T) {
nodes, bg, _, shutdown := setupBlockHeightZero(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer shutdown()
ctx := context.Background()
addTimeToBackend := func(node string, ts time.Duration) {
mockBackend, ok := nodes[node]
require.True(t, ok, "Fatal error bad node key for override clock")
mockBackend.clock.Set(mockBackend.clock.Now().Add(ts))
}
// poll for updated consensus
update := func() {
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
addTimeToBackend("node1", 3*time.Second)
addTimeToBackend("node2", 3*time.Second)
}
// convenient methods to manipulate state and mock responses
reset := func() {
for _, node := range nodes {
node.handler.ResetOverrides()
node.mockBackend.Reset()
node.backend.ClearSlidingWindows()
}
bg.Consensus.ClearListeners()
bg.Consensus.Reset()
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
now := ts("2023-04-21 15:00:00")
clock := sw.NewAdjustableClock(now)
b1 := nodes["node1"]
b2 := nodes["node2"]
b1.intermittentNetErrorWindow = sw.NewSlidingWindow(
sw.WithWindowLength(5*time.Minute),
sw.WithBucketSize(time.Second),
sw.WithClock(clock))
b2.intermittentNetErrorWindow = sw.NewSlidingWindow(
sw.WithWindowLength(5*time.Minute),
sw.WithBucketSize(time.Second),
sw.WithClock(clock))
b1.clock = clock
b2.clock = clock
b1.backend.Override(proxyd.WithIntermittentNetworkErrorSlidingWindow(b1.intermittentNetErrorWindow))
b2.backend.Override(proxyd.WithIntermittentNetworkErrorSlidingWindow(b2.intermittentNetErrorWindow))
nodes["node1"] = b1
nodes["node2"] = b2
require.Zero(t, nodes["node1"].intermittentNetErrorWindow.Count())
require.Zero(t, nodes["node2"].intermittentNetErrorWindow.Count())
}
override := func(node string, method string, block string, response string, responseCode int) {
if _, ok := nodes[node]; !ok {
t.Fatalf("node %s does not exist in the nodes map", node)
}
nodes[node].handler.AddOverride(&ms.MethodTemplate{
Method: method,
Block: block,
Response: response,
ResponseCode: responseCode,
})
}
overrideBlock := func(node string, blockRequest string, blockResponse string, responseCode int) {
override(node,
"eth_getBlockByNumber",
blockRequest,
buildResponse(map[string]string{
"number": blockResponse,
"hash": "hash_" + blockResponse,
}),
responseCode,
)
}
overridePeerCount := func(node string, count int, responseCode int) {
override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()), responseCode)
}
type blockHeights struct {
latestBlockNumber hexutil.Uint64
latestBlockHash string
safeBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
}
getBlockHeights := func(node string) blockHeights {
bs := bg.Consensus.GetBackendState(nodes[node].backend)
lB, lHash := bs.GetLatestBlock()
sB := bs.GetSafeBlockNumber()
fB := bs.GetFinalizedBlockNumber()
return blockHeights{
latestBlockNumber: lB,
latestBlockHash: lHash,
safeBlockNumber: sB,
finalizedBlockNumber: fB,
}
}
for _, blockState := range []string{"latest", "finalized", "safe"} {
t.Run("Test that the backend will not be banned if "+blockState+" responds 429", func(t *testing.T) {
reset()
update()
bh1 := getBlockHeights("node1")
overrideBlock("node1", blockState, "0x101", 429)
update()
bh2 := getBlockHeights("node1")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
})
// Write a test which will check the sliding window increments each time by one
t.Run("Test that the backend will not be banned and single increment of window if "+blockState+" responds 500", func(t *testing.T) {
reset()
update()
bh1 := getBlockHeights("node1")
overrideBlock("node1", blockState, "0x101", 500)
update()
bh2 := getBlockHeights("node1")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
})
t.Run("Test that the backend will not be banned and single increment of window if "+blockState+" responds 0 and 200", func(t *testing.T) {
reset()
update()
bh1 := getBlockHeights("node2")
overrideBlock("node2", blockState, "0x0", 200)
update()
bh2 := getBlockHeights("node2")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(0))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(1))
})
}
t.Run("Test that the backend will not be banned and single increment of window if latest responds 200 with block height zero", func(t *testing.T) {
reset()
update()
overrideBlock("node1", "latest", "0x0", 200)
bh1 := getBlockHeights("node1")
update()
bh2 := getBlockHeights("node1")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
})
t.Run("Test that the backend will not be banned if latest responds 5xx for peer count", func(t *testing.T) {
reset()
update()
overridePeerCount("node2", 59, 500)
bh1 := getBlockHeights("node2")
update()
bh2 := getBlockHeights("node2")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(0))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(1))
})
t.Run("Test that the backend will not be banned if latest responds 4xx for peer count", func(t *testing.T) {
reset()
update()
overridePeerCount("node1", 59, 429)
bh1 := getBlockHeights("node1")
update()
bh2 := getBlockHeights("node1")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
})
t.Run("Test that the backend will not be banned if latest responds 200 and 0 for peer count", func(t *testing.T) {
reset()
update()
bh1 := getBlockHeights("node1")
overridePeerCount("node1", 0, 200)
update()
bh2 := getBlockHeights("node1")
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
require.Equal(t, bh1.latestBlockNumber.String(), bh2.latestBlockNumber.String())
require.Equal(t, bh1.safeBlockNumber.String(), bh2.safeBlockNumber.String())
require.Equal(t, bh1.finalizedBlockNumber.String(), bh2.finalizedBlockNumber.String())
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(1))
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
})
t.Run("Test that if it breaches the network error threshold the node will be banned", func(t *testing.T) {
reset()
update()
overrideBlock("node1", "latest", "0x0", 500)
overrideBlock("node1", "safe", "0x0", 429)
overrideBlock("node1", "finalized", "0x0", 403)
overridePeerCount("node1", 0, 500)
for i := 1; i < 7; i++ {
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend), "Execpted node 1 to be not banned on iteration ", i)
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend), "Execpted node 2 to be not banned on iteration ", i)
update()
// On the 5th update (i=6), node 1 will be banned due to error rate and not increment window
if i < 6 {
require.Equal(t, nodes["node1"].intermittentNetErrorWindow.Count(), uint(i))
}
require.Equal(t, nodes["node2"].intermittentNetErrorWindow.Count(), uint(0))
}
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
require.False(t, bg.Consensus.IsBanned(nodes["node2"].backend))
})
}

@ -102,9 +102,11 @@ func TestConsensus(t *testing.T) {
for _, node := range nodes { for _, node := range nodes {
node.handler.ResetOverrides() node.handler.ResetOverrides()
node.mockBackend.Reset() node.mockBackend.Reset()
node.backend.ClearSlidingWindows()
} }
bg.Consensus.ClearListeners() bg.Consensus.ClearListeners()
bg.Consensus.Reset() bg.Consensus.Reset()
} }
override := func(node string, method string, block string, response string) { override := func(node string, method string, block string, response string) {
@ -311,7 +313,11 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, nodes["node1"].backend) require.NotContains(t, consensusGroup, nodes["node1"].backend)
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend)) require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend),
fmt.Sprintf("Expected Node to be banned. \n\tCurrent Time: %s \n\tBanned Until: %s",
time.Now().Format("01-02-2006 15:04:05"),
bg.Consensus.BannedUntil(nodes["node1"].backend).Format("01-02-2006 15:04:05")),
)
require.Equal(t, 0, len(consensusGroup)) require.Equal(t, 0, len(consensusGroup))
}) })

@ -87,6 +87,52 @@ func TestFailover(t *testing.T) {
}) })
} }
// the bad endpoint has had 10 requests with 8 error (3xx/4xx/5xx) responses, it should be marked as unhealthy and deprioritized
t.Run("bad endpoint marked as unhealthy", func(t *testing.T) {
res, statusCode, err := client.SendRPC("eth_chainId", nil)
require.NoError(t, err)
require.Equal(t, 200, statusCode)
RequireEqualJSON(t, []byte(goodResponse), res)
require.Equal(t, 1, len(goodBackend.Requests()))
require.Equal(t, 0, len(badBackend.Requests())) // bad backend is not called anymore
goodBackend.Reset()
badBackend.Reset()
})
t.Run("bad endpoint is still called if good endpoint also went bad", func(t *testing.T) {
goodBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second)
_, _ = w.Write([]byte("[{}]"))
}))
badBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(503)
_, _ = w.Write([]byte(unexpectedResponse))
}))
res, statusCode, _ := client.SendRPC("eth_chainId", nil)
require.Equal(t, 503, statusCode)
RequireEqualJSON(t, []byte(noBackendsResponse), res) // return no backend available since both failed
require.Equal(t, 1, len(goodBackend.Requests()))
require.Equal(t, 1, len(badBackend.Requests())) // bad backend is still called
goodBackend.Reset()
badBackend.Reset()
})
}
func TestFailoverMore(t *testing.T) {
goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse))
defer goodBackend.Close()
badBackend := NewMockBackend(nil)
defer badBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL()))
config := ReadConfig("failover")
client := NewProxydClient("http://127.0.0.1:8545")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
t.Run("backend times out and falls back to another", func(t *testing.T) { t.Run("backend times out and falls back to another", func(t *testing.T) {
badBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { badBackend.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)

@ -182,7 +182,7 @@ func TestFallback(t *testing.T) {
Fallback will be returned subsequent update Fallback will be returned subsequent update
*/ */
triggerFirstNormalFailure := func() { triggerFirstNormalFailure := func() {
overridePeerCount("normal", 0) overridePeerCount("normal", 1)
update() update()
require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback")) require.True(t, containsNode(bg.Consensus.GetConsensusGroup(), "fallback"))
require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal")) require.False(t, containsNode(bg.Consensus.GetConsensusGroup(), "normal"))

@ -0,0 +1,500 @@
package integration_tests
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
const nonceErrorResponse = `{"jsonrpc": "2.0","error": {"code": -32000, "message": "nonce too low"},"id": 1}`
const txAccepted = `{"jsonrpc": "2.0","result": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef","id": 1}`
func setupMulticall(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func(), *proxyd.Server, []*ms.MockedHandler) {
// setup mock servers
node1 := NewMockBackend(nil)
node2 := NewMockBackend(nil)
node3 := NewMockBackend(nil)
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/multicall_responses.yml")
emptyResponses := path.Join(dir, "testdata/empty_responses.yml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: "",
}
h3 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: emptyResponses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
require.NoError(t, os.Setenv("NODE3_URL", node3.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(SingleResponseHandler(200, txAccepted))
node3.SetHandler(SingleResponseHandler(429, dummyRes))
// setup proxyd
config := ReadConfig("multicall")
fmt.Printf("[SetupMulticall] Using Timeout of %d \n", config.Server.TimeoutSeconds)
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545")
// expose the backend group
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.Nil(t, bg.Consensus, "Expeceted consensus not to be initialized")
require.Equal(t, 3, len(bg.Backends))
require.Equal(t, bg.GetRoutingStrategy(), proxyd.MulticallRoutingStrategy)
// convenient mapping to access the nodes by name
nodes := map[string]nodeContext{
"node1": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
"node3": {
mockBackend: node3,
backend: bg.Backends[2],
handler: &h3,
},
}
handlers := []*ms.MockedHandler{&h1, &h2, &h3}
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
nodes["node2"].mockBackend.SetHandler(http.HandlerFunc(handlers[1].Handler))
//Node 3 has no handler empty handler never respondes should always context timeout
nodes["node3"].mockBackend.SetHandler(http.HandlerFunc(handlers[2].Handler))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node3"))
return nodes, bg, client, shutdown, svr, handlers
}
func setServerBackend(s *proxyd.Server, nm map[string]nodeContext) *proxyd.Server {
bg := s.BackendGroups
bg["node"].Backends = []*proxyd.Backend{
nm["node1"].backend,
nm["node2"].backend,
nm["node3"].backend,
}
s.BackendGroups = bg
return s
}
func nodeBackendRequestCount(nodes map[string]nodeContext, node string) int {
return len(nodes[node].mockBackend.requests)
}
func TestMulticall(t *testing.T) {
t.Run("Multicall will request all backends", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(401, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(500, dummyRes))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
svr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node3"})
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("When all of the backends return non 200, multicall should return 503", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 503, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.True(t, rpcRes.IsError())
require.Equal(t, proxyd.ErrNoBackends.Code, rpcRes.Error.Code)
require.Equal(t, proxyd.ErrNoBackends.Message, rpcRes.Error.Message)
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("It should return the first 200 response", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"})
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("Ensure application level error is returned to caller if its first", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan1, 4*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(403, dummyRes))
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan2 <- struct{}{}
shutdownChan1 <- struct{}{}
wg.Done()
}()
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"})
require.True(t, rpcRes.IsError())
wg.Wait()
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("It should ignore network errors and return a 200 from a slower request", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
// We should ignore node2 first response cause 429, and return node 1 because 200
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, txAccepted))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node1"})
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("When one of the backends times out", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan, 7*time.Second))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
shutdownChan <- struct{}{}
defer resp.Body.Close()
require.NotNil(t, resp.Body)
servedBy := "node/node1"
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("allBackends times out", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan1, 7*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan2, 7*time.Second))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan1 <- struct{}{}
shutdownChan2 <- struct{}{}
wg.Done()
}()
fmt.Println("sending request")
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 503, resp.StatusCode, "expected no response")
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.True(t, rpcRes.IsError())
require.Equal(t, rpcRes.Error.Code, proxyd.ErrNoBackends.Code)
// Wait for test response to complete before checking query count
wg.Wait()
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("Test with many multi-calls in without resetting", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
for i := 1; i < 4; i++ {
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
shutdownChan3 := make(chan struct{})
switch {
case i == 1:
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, txAccepted, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(429, dummyRes, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(503, dummyRes, shutdownChan3, 1*time.Second))
case i == 2:
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(405, dummyRes, shutdownChan3, 1*time.Second))
case i == 3:
// Return the quickest response
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(500, dummyRes, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan3, 1*time.Second))
}
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan1 <- struct{}{}
shutdownChan2 <- struct{}{}
shutdownChan3 <- struct{}{}
wg.Done()
}()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
switch {
case i == 1:
servedBy := "node/node1"
require.NotNil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.False(t, rpcRes.IsError())
case i == 2:
servedBy := "node/node2"
require.Nil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node2")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.True(t, rpcRes.IsError())
case i == 3:
servedBy := "node/node3"
require.Nil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node3")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.True(t, rpcRes.IsError())
}
// Wait for test response to complete before checking query count
wg.Wait()
require.Equal(t, i, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, i, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, i, nodeBackendRequestCount(nodes, "node3"))
}
})
}
func SingleResponseHandlerWithSleep(code int, response string, duration time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("sleeping")
time.Sleep(duration)
fmt.Println("Shutting down Single Response Handler")
w.WriteHeader(code)
_, _ = w.Write([]byte(response))
}
}
func SingleResponseHandlerWithSleepShutdown(code int, response string, shutdownServer chan struct{}, duration time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("sleeping")
time.Sleep(duration)
<-shutdownServer
fmt.Println("Shutting down Single Response Handler")
w.WriteHeader(code)
_, _ = w.Write([]byte(response))
}
}

@ -16,7 +16,6 @@ type resWithCode struct {
res []byte res []byte
} }
const frontendOverLimitResponse = `{"error":{"code":-32016,"message":"over rate limit with special message"},"id":null,"jsonrpc":"2.0"}`
const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over rate limit with special message"},"id":999,"jsonrpc":"2.0"}` const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over rate limit with special message"},"id":999,"jsonrpc":"2.0"}`
var ethChainID = "eth_chainId" var ethChainID = "eth_chainId"
@ -37,7 +36,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
limitedRes, codes := spamReqs(t, client, ethChainID, 429, 3) limitedRes, codes := spamReqs(t, client, ethChainID, 429, 3)
require.Equal(t, 1, codes[429]) require.Equal(t, 1, codes[429])
require.Equal(t, 2, codes[200]) require.Equal(t, 2, codes[200])
RequireEqualJSON(t, []byte(frontendOverLimitResponse), limitedRes) RequireEqualJSON(t, []byte(frontendOverLimitResponseWithID), limitedRes)
}) })
t.Run("exempt user agent over limit", func(t *testing.T) { t.Run("exempt user agent over limit", func(t *testing.T) {
@ -106,6 +105,25 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
t.Run("Batch RPC with some requests rate limited", func(t *testing.T) {
client := NewProxydClient("http://127.0.0.1:8545")
req := NewRPCReq("123", "eth_chainId", nil)
out, code, err := client.SendBatchRPC(req, req, req)
require.NoError(t, err)
var res []proxyd.RPCRes
require.NoError(t, json.Unmarshal(out, &res))
expCode := proxyd.ErrOverRateLimit.Code
require.Equal(t, 200, code)
require.Equal(t, 3, len(res))
require.Nil(t, res[0].Error)
require.Nil(t, res[1].Error)
// base rate = 2, so the third request should be rate limited
require.Equal(t, expCode, res[2].Error.Code)
})
time.Sleep(time.Second)
t.Run("RPC override in batch exempt", func(t *testing.T) { t.Run("RPC override in batch exempt", func(t *testing.T) {
h := make(http.Header) h := make(http.Header)
h.Set("User-Agent", "exempt_agent") h.Set("User-Agent", "exempt_agent")

@ -0,0 +1,35 @@
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
max_degraded_latency_threshold = "30ms"
max_error_rate_threshold = 0.25
[backends]
[backends.node1]
rpc_url = "$NODE1_URL"
[backends.node2]
rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2"]
routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests
## Consensus Ban Need to set very large, becaue consensus poller uses system clock, not adjustable clock
## if a certain test case takes longer than 15m it may break
consensus_ban_period = "15m"
consensus_max_update_threshold = "2m"
consensus_max_block_lag = 8
consensus_min_peer_count = 4
[rpc_method_mappings]
eth_call = "node"
eth_chainId = "node"
eth_blockNumber = "node"
eth_getBlockByNumber = "node"
consensus_getReceipts = "node"

@ -0,0 +1,234 @@
- method: eth_chainId
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "hello",
}
- method: net_peerCount
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "0x10"
}
- method: eth_syncing
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": false
}
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x101
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x102
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x102",
"number": "0x102"
}
}
- method: eth_getBlockByNumber
block: 0x103
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x103",
"number": "0x103"
}
}
- method: eth_getBlockByNumber
block: 0x10a
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x10a",
"number": "0x10a"
}
}
- method: eth_getBlockByNumber
block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x133",
"number": "0x133"
}
}
- method: eth_getBlockByNumber
block: 0x134
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x134",
"number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
}
}
- method: eth_getBlockByNumber
block: 0x91
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x91",
"number": "0x91"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: 0xe1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xc1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xd1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xd1",
"number": "0xd1"
}
}
- method: debug_getRawReceipts
block: 0x55
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0x101
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: eth_getTransactionReceipt
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "eth_getTransactionReceipt"
}
}
- method: alchemy_getTransactionReceipts
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "alchemy_getTransactionReceipts"
}
}

@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL"
[backend_groups] [backend_groups]
[backend_groups.node] [backend_groups.node]
backends = ["node1", "node2"] backends = ["node1", "node2"]
consensus_aware = true routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m" consensus_ban_period = "1m"
consensus_max_update_threshold = "2m" consensus_max_update_threshold = "2m"

@ -0,0 +1,7 @@
- method: bad_method
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "nope",
}

@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL"
[backend_groups] [backend_groups]
[backend_groups.node] [backend_groups.node]
backends = ["normal", "fallback"] backends = ["normal", "fallback"]
consensus_aware = true routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m" consensus_ban_period = "1m"
consensus_max_update_threshold = "2m" consensus_max_update_threshold = "2m"

@ -0,0 +1,27 @@
[server]
rpc_port = 8545
enable_served_by_header = true
timeout_seconds = 7
[backend]
response_timeout_seconds = 4
max_degraded_latency_threshold = "30ms"
[backends]
[backends.node1]
rpc_url = "$NODE1_URL"
[backends.node2]
rpc_url = "$NODE2_URL"
[backends.node3]
rpc_url = "$NODE3_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2", "node3"]
routing_strategy = "multicall"
[rpc_method_mappings]
eth_call = "node"
eth_sendRawTransaction = "node"

@ -0,0 +1,234 @@
- method: eth_chainId
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "hello",
}
- method: net_peerCount
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "0x10"
}
- method: eth_syncing
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": false
}
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x101
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x101",
"number": "0x101"
}
}
- method: eth_getBlockByNumber
block: 0x102
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x102",
"number": "0x102"
}
}
- method: eth_getBlockByNumber
block: 0x103
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x103",
"number": "0x103"
}
}
- method: eth_getBlockByNumber
block: 0x10a
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x10a",
"number": "0x10a"
}
}
- method: eth_getBlockByNumber
block: 0x132
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x132",
"number": "0x132"
}
}
- method: eth_getBlockByNumber
block: 0x133
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x133",
"number": "0x133"
}
}
- method: eth_getBlockByNumber
block: 0x134
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x134",
"number": "0x134"
}
}
- method: eth_getBlockByNumber
block: 0x200
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x200",
"number": "0x200"
}
}
- method: eth_getBlockByNumber
block: 0x91
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0x91",
"number": "0x91"
}
}
- method: eth_getBlockByNumber
block: safe
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: 0xe1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xe1",
"number": "0xe1"
}
}
- method: eth_getBlockByNumber
block: finalized
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xc1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xc1",
"number": "0xc1"
}
}
- method: eth_getBlockByNumber
block: 0xd1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash_0xd1",
"number": "0xd1"
}
}
- method: debug_getRawReceipts
block: 0x55
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0x101
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: eth_getTransactionReceipt
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "eth_getTransactionReceipt"
}
}
- method: alchemy_getTransactionReceipts
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "alchemy_getTransactionReceipts"
}
}

@ -428,6 +428,25 @@ var (
"backend_name", "backend_name",
"fallback", "fallback",
}) })
backendGroupMulticallCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "backend_group_multicall_request_counter",
Help: "Record the amount of multicall requests",
}, []string{
"backend_group",
"backend_name",
})
backendGroupMulticallCompletionCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "backend_group_multicall_completion_counter",
Help: "Record the amount of completed multicall requests",
}, []string{
"backend_group",
"backend_name",
"error",
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
@ -593,6 +612,14 @@ func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) {
backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback)) backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback))
} }
func RecordBackendGroupMulticallRequest(bg *BackendGroup, backendName string) {
backendGroupMulticallCounter.WithLabelValues(bg.Name, backendName).Inc()
}
func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string, error string) {
backendGroupMulticallCompletionCounter.WithLabelValues(bg.Name, backendName, error).Inc()
}
func boolToFloat64(b bool) float64 { func boolToFloat64(b bool) float64 {
if b { if b {
return 1 return 1

@ -186,3 +186,11 @@ func (sw *AvgSlidingWindow) Count() uint {
sw.advance() sw.advance()
return sw.qty return sw.qty
} }
// advance evicts old data points
func (sw *AvgSlidingWindow) Clear() {
defer sw.mux.Unlock()
sw.mux.Lock()
sw.qty = 0
sw.sum = 0.0
}

@ -228,6 +228,7 @@ func Start(config *Config) (*Server, func(), error) {
Backends: backends, Backends: backends,
WeightedRouting: bg.WeightedRouting, WeightedRouting: bg.WeightedRouting,
FallbackBackends: fallbackBackends, FallbackBackends: fallbackBackends,
routingStrategy: bg.RoutingStrategy,
} }
} }
@ -327,7 +328,7 @@ func Start(config *Config) (*Server, func(), error) {
if config.Server.RPCPort != 0 { if config.Server.RPCPort != 0 {
go func() { go func() {
if err := srv.RPCListenAndServe(config.Server.RPCHost, config.Server.RPCPort); err != nil { if err := srv.RPCListenAndServe(config.Server); err != nil {
if errors.Is(err, http.ErrServerClosed) { if errors.Is(err, http.ErrServerClosed) {
log.Info("RPC server shut down") log.Info("RPC server shut down")
return return
@ -347,13 +348,20 @@ func Start(config *Config) (*Server, func(), error) {
log.Crit("error starting WS server", "err", err) log.Crit("error starting WS server", "err", err)
} }
}() }()
} else { } else if !config.Server.EnableWS {
log.Info("WS server not enabled (ws_port is set to 0)") log.Info("WS server not enabled (ws_port is set to 0)")
} }
for bgName, bg := range backendGroups { for bgName, bg := range backendGroups {
bgcfg := config.BackendGroups[bgName] bgcfg := config.BackendGroups[bgName]
if bgcfg.ConsensusAware {
if !bgcfg.ValidateRoutingStrategy(bgName) {
log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, \"\"", "name", bgName)
}
log.Info("configuring routing strategy for backend_group", "name", bgName, "routing_strategy", bgcfg.RoutingStrategy)
if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy {
log.Info("creating poller for consensus aware backend_group", "name", bgName) log.Info("creating poller for consensus aware backend_group", "name", bgName)
copts := make([]ConsensusOpt, 0) copts := make([]ConsensusOpt, 0)

@ -64,7 +64,9 @@ func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResul
switch req.Method { switch req.Method {
case "eth_getLogs", case "eth_getLogs",
"eth_newFilter": "eth_newFilter":
return rewriteRange(rctx, req, res, 0) // return rewriteRange(rctx, req, res, 0)
// Tornado: disable range check unti the UI is fixed
return RewriteNone, nil
case "debug_getRawReceipts", "consensus_getReceipts": case "debug_getRawReceipts", "consensus_getReceipts":
return rewriteParam(rctx, req, res, 0, true, false) return rewriteParam(rctx, req, res, 0, true, false)
case "eth_getBalance", case "eth_getBalance",

@ -200,12 +200,23 @@ func NewServer(
}, nil }, nil
} }
func (s *Server) RPCListenAndServe(host string, port int) error { func (s *Server) RPCListenAndServe(serverConfig ServerConfig) error {
host := serverConfig.RPCHost
port := serverConfig.RPCPort
enableWS := serverConfig.EnableWS
var handleRpc ReqHandle = s.GetRPCHandle(serverConfig)
s.srvMu.Lock() s.srvMu.Lock()
hdlr := mux.NewRouter() hdlr := mux.NewRouter()
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET") hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
hdlr.HandleFunc("/", s.HandleRPC).Methods("POST") hdlr.HandleFunc("/", handleRpc).Methods("POST")
hdlr.HandleFunc("/{authorization}", s.HandleRPC).Methods("POST") hdlr.HandleFunc("/{authorization}", handleRpc).Methods("POST")
if enableWS {
var handleWS ReqHandle = s.GetWSHandle(true)
hdlr.HandleFunc("/", handleWS)
hdlr.HandleFunc("/{authorization}", handleWS)
}
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedOrigins: []string{"*"}, AllowedOrigins: []string{"*"},
}) })
@ -215,15 +226,20 @@ func (s *Server) RPCListenAndServe(host string, port int) error {
Addr: addr, Addr: addr,
} }
log.Info("starting HTTP server", "addr", addr) log.Info("starting HTTP server", "addr", addr)
if enableWS {
log.Info("starting WS server", "addr", addr)
}
s.srvMu.Unlock() s.srvMu.Unlock()
return s.rpcServer.ListenAndServe() return s.rpcServer.ListenAndServe()
} }
func (s *Server) WSListenAndServe(host string, port int) error { func (s *Server) WSListenAndServe(host string, port int) error {
s.srvMu.Lock() s.srvMu.Lock()
var handleWS ReqHandle = s.GetWSHandle(false)
hdlr := mux.NewRouter() hdlr := mux.NewRouter()
hdlr.HandleFunc("/", s.HandleWS) hdlr.HandleFunc("/", handleWS)
hdlr.HandleFunc("/{authorization}", s.HandleWS) hdlr.HandleFunc("/{authorization}", handleWS)
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedOrigins: []string{"*"}, AllowedOrigins: []string{"*"},
}) })
@ -255,7 +271,15 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("OK")) _, _ = w.Write([]byte("OK"))
} }
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { type ReqHandle func(http.ResponseWriter, *http.Request)
func (s *Server) GetRPCHandle(serverConfig ServerConfig) ReqHandle {
return func(w http.ResponseWriter, r *http.Request) {
s.HandleRPC(w, r, serverConfig)
}
}
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request, serverConfig ServerConfig) {
ctx := s.populateContext(w, r) ctx := s.populateContext(w, r)
if ctx == nil { if ctx == nil {
return return
@ -272,9 +296,14 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent) isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent)
if xff == "" { if xff == "" {
// Just use remote addr from socket when the request doesn't have x_forwarded_for header
if (serverConfig.AllowDirect) {
xff = r.RemoteAddr
} else {
writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP")) writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP"))
return return
} }
}
isLimited := func(method string) bool { isLimited := func(method string) bool {
isGloballyLimitedMethod := s.isGlobalLimit(method) isGloballyLimitedMethod := s.isGlobalLimit(method)
@ -301,21 +330,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return !ok return !ok
} }
if isLimited("") { log.Debug(
RecordRPCError(ctx, BackendProxyd, "unknown", ErrOverRateLimit)
log.Warn(
"rate limited request",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"user_agent", userAgent,
"origin", origin,
"remote_ip", xff,
)
writeRPCError(ctx, w, nil, ErrOverRateLimit)
return
}
log.Info(
"received RPC request", "received RPC request",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx), "auth", GetAuthCtx(ctx),
@ -368,7 +383,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true) batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(xff, r, ctx, reqs, isLimited, true)
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
writeRPCError(ctx, w, nil, ErrGatewayTimeout) writeRPCError(ctx, w, nil, ErrGatewayTimeout)
return return
@ -391,7 +406,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
} }
rawBody := json.RawMessage(body) rawBody := json.RawMessage(body)
backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false) backendRes, cached, servedBy, err := s.handleBatchRPC(xff, r, ctx, []json.RawMessage{rawBody}, isLimited, false)
if err != nil { if err != nil {
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
@ -408,7 +423,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
writeRPCRes(ctx, w, backendRes[0]) writeRPCRes(ctx, w, backendRes[0])
} }
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) { func (s *Server) handleBatchRPC(xff string, r *http.Request, ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
// A request set is transformed into groups of batches. // A request set is transformed into groups of batches.
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints) // Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's // A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
@ -420,6 +435,10 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
backendGroup string backendGroup string
} }
// Retrieve info from header
origin := r.Header.Get("Origin")
userAgent := r.Header.Get("User-Agent")
responses := make([]*RPCRes, len(reqs)) responses := make([]*RPCRes, len(reqs))
batches := make(map[batchGroup][]batchElem) batches := make(map[batchGroup][]batchElem)
ids := make(map[string]int, len(reqs)) ids := make(map[string]int, len(reqs))
@ -432,6 +451,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
continue continue
} }
log.Debug(
"received RPC method",
"req_id", GetReqID(ctx),
"method", parsedReq.Method,
"user_agent", userAgent,
"origin", origin,
"remote_ip", xff,
)
// Simple health check // Simple health check
if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod { if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod {
res := &RPCRes{ res := &RPCRes{
@ -463,22 +491,38 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
"source", "rpc", "source", "rpc",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"method", parsedReq.Method, "method", parsedReq.Method,
"user_agent", userAgent,
"origin", origin,
"remote_ip", xff,
) )
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted) RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted) responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted)
continue continue
} }
// Take base rate limit first
if isLimited("") {
log.Debug(
"rate limited individual RPC in a batch request",
"source", "rpc",
"req_id", parsedReq.ID,
"method", parsedReq.Method,
)
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
continue
}
// Take rate limit for specific methods. // Take rate limit for specific methods.
// NOTE: eventually, this should apply to all batch requests. However,
// since we don't have data right now on the size of each batch, we
// only apply this to the methods that have an additional rate limit.
if _, ok := s.overrideLims[parsedReq.Method]; ok && isLimited(parsedReq.Method) { if _, ok := s.overrideLims[parsedReq.Method]; ok && isLimited(parsedReq.Method) {
log.Info( log.Debug(
"rate limited specific RPC", "rate limited specific RPC",
"source", "rpc", "source", "rpc",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"method", parsedReq.Method, "method", parsedReq.Method,
"user_agent", userAgent,
"origin", origin,
"remote_ip", xff,
) )
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit) RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit) responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
@ -583,12 +627,31 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
return responses, cached, servedByString, nil return responses, cached, servedByString, nil
} }
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { func (s *Server) GetWSHandle(fromRpc bool) ReqHandle {
return func(w http.ResponseWriter, r *http.Request) {
s.HandleWS(w, r, fromRpc)
}
}
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request, fromRpc bool) {
ctx := s.populateContext(w, r) ctx := s.populateContext(w, r)
if ctx == nil { if ctx == nil {
return return
} }
// Handle upgrade header request
upgrade := false
for _, header := range r.Header["Upgrade"] {
if header == "websocket" {
upgrade = true
break
}
}
// Filter out non websocket requests
if fromRpc && !upgrade {
return
}
log.Info("received WS connection", "req_id", GetReqID(ctx)) log.Info("received WS connection", "req_id", GetReqID(ctx))
clientConn, err := s.upgrader.Upgrade(w, r, nil) clientConn, err := s.upgrader.Upgrade(w, r, nil)

@ -19,6 +19,7 @@ type MethodTemplate struct {
Method string `yaml:"method"` Method string `yaml:"method"`
Block string `yaml:"block"` Block string `yaml:"block"`
Response string `yaml:"response"` Response string `yaml:"response"`
ResponseCode int `yaml:"response_code"`
} }
type MockedHandler struct { type MockedHandler struct {
@ -85,6 +86,9 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
for _, r := range template { for _, r := range template {
if r.Method == method && r.Block == block { if r.Method == method && r.Block == block {
selectedResponse = r.Response selectedResponse = r.Response
if r.ResponseCode != 0 {
w.WriteHeader(r.ResponseCode)
}
} }
} }
if selectedResponse != "" { if selectedResponse != "" {