From d4382bfa19a3845d8bd49466a6bcb5aba820906d Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Mon, 22 Jul 2024 16:44:27 -0500 Subject: [PATCH] feat: add multi call routing strategy (#26) * feat: add multicall routing strategy * Updated proxyd config to accept a routing strategy parameter * Added multicall routing strategies * Refactored backendGroup.Forward to handle multiple types of routing strategies * Background.ForwardToBackend now will return results via a channel --- proxyd/backend.go | 401 ++++++++++---- proxyd/config.go | 45 ++ ...ock_height_zero_and_network_errors_test.go | 2 +- proxyd/integration_tests/multicall_test.go | 500 ++++++++++++++++++ .../block_height_zero_and_network_errors.toml | 2 +- .../integration_tests/testdata/consensus.toml | 2 +- .../testdata/empty_responses.yml | 7 + .../integration_tests/testdata/fallback.toml | 2 +- .../integration_tests/testdata/multicall.toml | 27 + proxyd/metrics.go | 26 + proxyd/proxyd.go | 10 +- 11 files changed, 921 insertions(+), 103 deletions(-) create mode 100644 proxyd/integration_tests/multicall_test.go create mode 100644 proxyd/integration_tests/testdata/empty_responses.yml create mode 100644 proxyd/integration_tests/testdata/multicall.toml diff --git a/proxyd/backend.go b/proxyd/backend.go index 7907a0d..d1277a3 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -372,6 +372,14 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] ), ) + log.Trace( + "forwarding request to backend", + "name", b.Name, + "req_id", GetReqID(ctx), + "attempt_count", i+1, + "max_attempts", b.maxRetries+1, + "method", metricLabelMethod, + ) res, err := b.doForward(ctx, reqs, isBatch) switch err { case nil: // do nothing @@ -381,6 +389,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] "name", b.Name, "req_id", GetReqID(ctx), "max", b.maxResponseSize, + "method", metricLabelMethod, ) RecordBatchRPCError(ctx, b.Name, reqs, err) case ErrConsensusGetReceiptsCantBeBatched: @@ -415,6 +424,9 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] "name", b.Name, "req_id", GetReqID(ctx), "err", err, + "method", metricLabelMethod, + "attempt_count", i+1, + "max_retries", b.maxRetries+1, ) timer.ObserveDuration() RecordBatchRPCError(ctx, b.Name, reqs, err) @@ -716,6 +728,11 @@ type BackendGroup struct { WeightedRouting bool Consensus *ConsensusPoller FallbackBackends map[string]bool + routingStrategy RoutingStrategy +} + +func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy { + return bg.routingStrategy } func (bg *BackendGroup) Fallbacks() []*Backend { @@ -750,113 +767,165 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch overriddenResponses := make([]*indexedReqRes, 0) rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs)) + // When routing_strategy is set to `consensus_aware` the backend group acts as a load balancer + // serving traffic from any backend that agrees in the consensus group + // We also rewrite block tags to enforce compliance with consensus if bg.Consensus != nil { - // When `consensus_aware` is set to `true`, the backend group acts as a load balancer - // serving traffic from any backend that agrees in the consensus group + rpcReqs, overriddenResponses = bg.OverwriteConsensusResponses(rpcReqs, overriddenResponses, rewrittenReqs) + } - // We also rewrite block tags to enforce compliance with consensus - rctx := RewriteContext{ - latest: bg.Consensus.GetLatestBlockNumber(), - safe: bg.Consensus.GetSafeBlockNumber(), - finalized: bg.Consensus.GetFinalizedBlockNumber(), - maxBlockRange: bg.Consensus.maxBlockRange, - } - - for i, req := range rpcReqs { - res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} - result, err := RewriteTags(rctx, req, &res) - switch result { - case RewriteOverrideError: - overriddenResponses = append(overriddenResponses, &indexedReqRes{ - index: i, - req: req, - res: &res, - }) - if errors.Is(err, ErrRewriteBlockOutOfRange) { - res.Error = ErrBlockOutOfRange - } else if errors.Is(err, ErrRewriteRangeTooLarge) { - res.Error = ErrInvalidParams( - fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange), - ) - } else { - res.Error = ErrParseErr - } - case RewriteOverrideResponse: - overriddenResponses = append(overriddenResponses, &indexedReqRes{ - index: i, - req: req, - res: &res, - }) - case RewriteOverrideRequest, RewriteNone: - rewrittenReqs = append(rewrittenReqs, req) - } - } - rpcReqs = rewrittenReqs + // When routing_strategy is set to 'multicall' the request will be forward to all backends + // and return the first successful response + if bg.GetRoutingStrategy() == MulticallRoutingStrategy && isValidMulticallTx(rpcReqs) && !isBatch { + backendResp := bg.ExecuteMulticall(ctx, rpcReqs) + return backendResp.RPCRes, backendResp.ServedBy, backendResp.error } rpcRequestsTotal.Inc() - for _, back := range backends { - res := make([]*RPCRes, 0) - var err error + ch := make(chan BackendGroupRPCResponse) + go func() { + defer close(ch) + backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, backends, ctx, isBatch) + ch <- *backendResp + }() + backendResp := <-ch - servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name) - - if len(rpcReqs) > 0 { - res, err = back.Forward(ctx, rpcReqs, isBatch) - if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || - errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) || - errors.Is(err, ErrMethodNotWhitelisted) { - return nil, "", err - } - if errors.Is(err, ErrBackendResponseTooLarge) { - return nil, servedBy, err - } - if errors.Is(err, ErrBackendOffline) { - log.Warn( - "skipping offline backend", - "name", back.Name, - "auth", GetAuthCtx(ctx), - "req_id", GetReqID(ctx), - ) - continue - } - if errors.Is(err, ErrBackendOverCapacity) { - log.Warn( - "skipping over-capacity backend", - "name", back.Name, - "auth", GetAuthCtx(ctx), - "req_id", GetReqID(ctx), - ) - continue - } - if err != nil { - log.Error( - "error forwarding request to backend", - "name", back.Name, - "req_id", GetReqID(ctx), - "auth", GetAuthCtx(ctx), - "err", err, - ) - continue - } - } - - // re-apply overridden responses - for _, ov := range overriddenResponses { - if len(res) > 0 { - // insert ov.res at position ov.index - res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...) - } else { - res = append(res, ov.res) - } - } - - return res, servedBy, nil + if backendResp.error != nil { + log.Error("error serving requests", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "err", backendResp.error, + ) + return backendResp.RPCRes, backendResp.ServedBy, backendResp.error } - RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP) - return nil, "", ErrNoBackends + // re-apply overridden responses + log.Trace("successfully served request overriding responses", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + ) + res := OverrideResponses(backendResp.RPCRes, overriddenResponses) + return res, backendResp.ServedBy, backendResp.error +} + +func isValidMulticallTx(rpcReqs []*RPCReq) bool { + if len(rpcReqs) == 1 { + if rpcReqs[0].Method == "eth_sendRawTransaction" { + return true + } + } + return false +} + +// Using special struct since servedBy may not be populated if error occurs +type multicallTuple struct { + response *BackendGroupRPCResponse + backendName string +} + +func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq) *BackendGroupRPCResponse { + + // Create ctx without cancel so background tasks process + // after original request returns + bgCtx := context.WithoutCancel(ctx) + + log.Info("executing multicall routing strategy", + "req_id", GetReqID(bgCtx), + "auth", GetAuthCtx(bgCtx), + ) + var wg sync.WaitGroup + ch := make(chan *multicallTuple) + for _, backend := range bg.Backends { + wg.Add(1) + go bg.MulticallRequest(backend, rpcReqs, &wg, bgCtx, ch) + } + + go func() { + wg.Wait() + close(ch) + }() + + return bg.ProcessMulticallResponses(ch, bgCtx) +} + +func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg *sync.WaitGroup, ctx context.Context, ch chan *multicallTuple) { + defer wg.Done() + log.Debug("forwarding multicall request to backend", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "backend", backend.Name, + ) + + RecordBackendGroupMulticallRequest(bg, backend.Name) + backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, []*Backend{backend}, ctx, false) + + multicallResp := &multicallTuple{ + response: backendResp, + backendName: backend.Name, + } + + ch <- multicallResp + + log.Debug("received multicall response from backend", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "backend", backend.Name, + ) + if backendResp.error != nil { + log.Error("received multicall error response from backend", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "backend", backend.Name, + "error", backendResp.error.Error(), + ) + } +} + +func (bg *BackendGroup) ProcessMulticallResponses(ch chan *multicallTuple, ctx context.Context) *BackendGroupRPCResponse { + var finalResp *BackendGroupRPCResponse + i := 0 + for { + multicallResp, ok := <-ch + if !ok { + log.Trace("multicall response channel closed", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "response_count", i, + ) + if i > 0 { + return finalResp + } + return &BackendGroupRPCResponse{ + RPCRes: nil, + ServedBy: "", + error: errors.New("no multicall response received"), + } + } + + i++ + resp := multicallResp.response + backendName := multicallResp.backendName + RecordBackendGroupMulticallCompletion(bg, backendName) + + if resp.error != nil { + log.Error("received error response from multicall channel", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "err", resp.error, + "backend", backendName, + ) + finalResp = resp + continue + } + log.Info("received successful response from multicall channel", + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "served_by", resp.ServedBy, + "backend", backendName, + ) + return resp + } } func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { @@ -1281,3 +1350,139 @@ func stripXFF(xff string) string { ipList := strings.Split(xff, ",") return strings.TrimSpace(ipList[0]) } + +type BackendGroupRPCResponse struct { + RPCRes []*RPCRes + ServedBy string + error error +} + +func (bg *BackendGroup) ForwardRequestToBackendGroup( + rpcReqs []*RPCReq, + backends []*Backend, + ctx context.Context, + isBatch bool, +) *BackendGroupRPCResponse { + for _, back := range backends { + res := make([]*RPCRes, 0) + var err error + + servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name) + + if len(rpcReqs) > 0 { + + res, err = back.Forward(ctx, rpcReqs, isBatch) + + if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || + errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) || + errors.Is(err, ErrMethodNotWhitelisted) { + return &BackendGroupRPCResponse{ + RPCRes: nil, + ServedBy: "", + error: err, + } + } + if errors.Is(err, ErrBackendResponseTooLarge) { + return &BackendGroupRPCResponse{ + RPCRes: nil, + ServedBy: "", + error: err, + } + } + if errors.Is(err, ErrBackendOffline) { + log.Warn( + "skipping offline backend", + "name", back.Name, + "auth", GetAuthCtx(ctx), + "req_id", GetReqID(ctx), + ) + continue + } + if errors.Is(err, ErrBackendOverCapacity) { + log.Warn( + "skipping over-capacity backend", + "name", back.Name, + "auth", GetAuthCtx(ctx), + "req_id", GetReqID(ctx), + ) + continue + } + if err != nil { + log.Error( + "error forwarding request to backend", + "name", back.Name, + "req_id", GetReqID(ctx), + "auth", GetAuthCtx(ctx), + "err", err, + ) + continue + } + } + + return &BackendGroupRPCResponse{ + RPCRes: res, + ServedBy: servedBy, + error: nil, + } + } + + RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP) + return &BackendGroupRPCResponse{ + RPCRes: nil, + ServedBy: "", + error: ErrNoBackends, + } + +} + +func OverrideResponses(res []*RPCRes, overriddenResponses []*indexedReqRes) []*RPCRes { + for _, ov := range overriddenResponses { + if len(res) > 0 { + // insert ov.res at position ov.index + res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...) + } else { + res = append(res, ov.res) + } + } + return res +} + +func (bg *BackendGroup) OverwriteConsensusResponses(rpcReqs []*RPCReq, overriddenResponses []*indexedReqRes, rewrittenReqs []*RPCReq) ([]*RPCReq, []*indexedReqRes) { + rctx := RewriteContext{ + latest: bg.Consensus.GetLatestBlockNumber(), + safe: bg.Consensus.GetSafeBlockNumber(), + finalized: bg.Consensus.GetFinalizedBlockNumber(), + maxBlockRange: bg.Consensus.maxBlockRange, + } + + for i, req := range rpcReqs { + res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID} + result, err := RewriteTags(rctx, req, &res) + switch result { + case RewriteOverrideError: + overriddenResponses = append(overriddenResponses, &indexedReqRes{ + index: i, + req: req, + res: &res, + }) + if errors.Is(err, ErrRewriteBlockOutOfRange) { + res.Error = ErrBlockOutOfRange + } else if errors.Is(err, ErrRewriteRangeTooLarge) { + res.Error = ErrInvalidParams( + fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange), + ) + } else { + res.Error = ErrParseErr + } + case RewriteOverrideResponse: + overriddenResponses = append(overriddenResponses, &indexedReqRes{ + index: i, + req: req, + res: &res, + }) + case RewriteOverrideRequest, RewriteNone: + rewrittenReqs = append(rewrittenReqs, req) + } + } + return rewrittenReqs, overriddenResponses +} diff --git a/proxyd/config.go b/proxyd/config.go index 4719a55..83935e7 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -6,6 +6,8 @@ import ( "os" "strings" "time" + + "github.com/ethereum/go-ethereum/log" ) type ServerConfig struct { @@ -107,11 +109,54 @@ type BackendConfig struct { type BackendsConfig map[string]*BackendConfig +type RoutingStrategy string + +func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool { + + // If Consensus Aware is Set and Routing RoutingStrategy is populated fail + if b.ConsensusAware && b.RoutingStrategy != "" { + log.Warn("consensus_aware is now deprecated, please use routing_strategy = consensus_aware") + log.Crit("Exiting consensus_aware and routing strategy are mutually exclusive, they cannot both be defined") + } + + // If Consensus Aware is Set set RoutingStrategy to consensus_aware + if b.ConsensusAware { + b.RoutingStrategy = ConsensusAwareRoutingStrategy + log.Info("consensus_aware is now deprecated, please use routing_strategy = consenus_aware in the future") + } + + switch b.RoutingStrategy { + case ConsensusAwareRoutingStrategy: + return true + case MulticallRoutingStrategy: + return true + case FallbackRoutingStrategy: + return true + case "": + log.Info("Empty routing strategy provided for backend_group, using fallback strategy ", "name", bgName) + b.RoutingStrategy = FallbackRoutingStrategy + return true + default: + return false + } +} + +const ( + ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware" + MulticallRoutingStrategy RoutingStrategy = "multicall" + FallbackRoutingStrategy RoutingStrategy = "fallback" +) + type BackendGroupConfig struct { Backends []string `toml:"backends"` WeightedRouting bool `toml:"weighted_routing"` + RoutingStrategy RoutingStrategy `toml:"routing_strategy"` + + /* + Deprecated: Use routing_strategy config to create a consensus_aware proxyd instance + */ ConsensusAware bool `toml:"consensus_aware"` ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"` diff --git a/proxyd/integration_tests/block_height_zero_and_network_errors_test.go b/proxyd/integration_tests/block_height_zero_and_network_errors_test.go index fe7a040..898413f 100644 --- a/proxyd/integration_tests/block_height_zero_and_network_errors_test.go +++ b/proxyd/integration_tests/block_height_zero_and_network_errors_test.go @@ -72,7 +72,7 @@ func setupBlockHeightZero(t *testing.T) (map[string]*bhZeroNodeContext, *proxyd. bg := svr.BackendGroups["node"] require.NotNil(t, bg) - require.NotNil(t, bg.Consensus) + require.NotNil(t, bg.Consensus, "Expected Consenus Poller to be intialized") require.Equal(t, 2, len(bg.Backends)) // convenient mapping to access the nodes diff --git a/proxyd/integration_tests/multicall_test.go b/proxyd/integration_tests/multicall_test.go new file mode 100644 index 0000000..196bd97 --- /dev/null +++ b/proxyd/integration_tests/multicall_test.go @@ -0,0 +1,500 @@ +package integration_tests + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path" + "sync" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/proxyd" + ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" + "github.com/stretchr/testify/require" +) + +const nonceErrorResponse = `{"jsonrpc": "2.0","error": {"code": -32000, "message": "nonce too low"},"id": 1}` +const txAccepted = `{"jsonrpc": "2.0","result": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef","id": 1}` + +func setupMulticall(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func(), *proxyd.Server, []*ms.MockedHandler) { + // setup mock servers + node1 := NewMockBackend(nil) + node2 := NewMockBackend(nil) + node3 := NewMockBackend(nil) + + dir, err := os.Getwd() + require.NoError(t, err) + + responses := path.Join(dir, "testdata/multicall_responses.yml") + emptyResponses := path.Join(dir, "testdata/empty_responses.yml") + + h1 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + h2 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: "", + } + h3 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: emptyResponses, + } + + require.NoError(t, os.Setenv("NODE1_URL", node1.URL())) + require.NoError(t, os.Setenv("NODE2_URL", node2.URL())) + require.NoError(t, os.Setenv("NODE3_URL", node3.URL())) + + node1.SetHandler(http.HandlerFunc(h1.Handler)) + node2.SetHandler(SingleResponseHandler(200, txAccepted)) + node3.SetHandler(SingleResponseHandler(429, dummyRes)) + + // setup proxyd + config := ReadConfig("multicall") + fmt.Printf("[SetupMulticall] Using Timeout of %d \n", config.Server.TimeoutSeconds) + svr, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + + // expose the proxyd client + client := NewProxydClient("http://127.0.0.1:8545") + + // expose the backend group + bg := svr.BackendGroups["node"] + require.NotNil(t, bg) + require.Nil(t, bg.Consensus, "Expeceted consensus not to be initialized") + require.Equal(t, 3, len(bg.Backends)) + require.Equal(t, bg.GetRoutingStrategy(), proxyd.MulticallRoutingStrategy) + + // convenient mapping to access the nodes by name + nodes := map[string]nodeContext{ + "node1": { + mockBackend: node1, + backend: bg.Backends[0], + handler: &h1, + }, + "node2": { + mockBackend: node2, + backend: bg.Backends[1], + handler: &h2, + }, + "node3": { + mockBackend: node3, + backend: bg.Backends[2], + handler: &h3, + }, + } + + handlers := []*ms.MockedHandler{&h1, &h2, &h3} + + nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted)) + nodes["node2"].mockBackend.SetHandler(http.HandlerFunc(handlers[1].Handler)) + //Node 3 has no handler empty handler never respondes should always context timeout + nodes["node3"].mockBackend.SetHandler(http.HandlerFunc(handlers[2].Handler)) + + require.Equal(t, 0, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 0, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 0, nodeBackendRequestCount(nodes, "node3")) + + return nodes, bg, client, shutdown, svr, handlers +} + +func setServerBackend(s *proxyd.Server, nm map[string]nodeContext) *proxyd.Server { + bg := s.BackendGroups + bg["node"].Backends = []*proxyd.Backend{ + nm["node1"].backend, + nm["node2"].backend, + nm["node3"].backend, + } + s.BackendGroups = bg + return s +} + +func nodeBackendRequestCount(nodes map[string]nodeContext, node string) int { + return len(nodes[node].mockBackend.requests) +} + +func TestMulticall(t *testing.T) { + + t.Run("Multicall will request all backends", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(401, dummyRes)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(500, dummyRes)) + nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted)) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + svr.HandleRPC(rr, req) + resp := rr.Result() + defer resp.Body.Close() + require.NotNil(t, resp.Body) + require.Equal(t, 200, resp.StatusCode) + require.Equal(t, resp.Header["X-Served-By"], []string{"node/node3"}) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.False(t, rpcRes.IsError()) + + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("When all of the backends return non 200, multicall should return 503", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes)) + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + require.Equal(t, 503, resp.StatusCode) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.True(t, rpcRes.IsError()) + require.Equal(t, proxyd.ErrNoBackends.Code, rpcRes.Error.Code) + require.Equal(t, proxyd.ErrNoBackends.Message, rpcRes.Error.Message) + + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("It should return the first 200 response", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted)) + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + require.Equal(t, 200, resp.StatusCode) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.False(t, rpcRes.IsError()) + require.Equal(t, "2.0", rpcRes.JSONRPC) + + require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"}) + require.False(t, rpcRes.IsError()) + + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("Ensure application level error is returned to caller if its first", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + + defer shutdown() + + shutdownChan1 := make(chan struct{}) + shutdownChan2 := make(chan struct{}) + + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan1, 4*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second)) + nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(403, dummyRes)) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + shutdownChan2 <- struct{}{} + shutdownChan1 <- struct{}{} + wg.Done() + }() + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + require.Equal(t, 200, resp.StatusCode) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.Equal(t, "2.0", rpcRes.JSONRPC) + require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"}) + require.True(t, rpcRes.IsError()) + + wg.Wait() + + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("It should ignore network errors and return a 200 from a slower request", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + // We should ignore node2 first response cause 429, and return node 1 because 200 + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, txAccepted)) + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + require.Equal(t, 200, resp.StatusCode) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.False(t, rpcRes.IsError()) + require.Equal(t, "2.0", rpcRes.JSONRPC) + + require.Equal(t, resp.Header["X-Served-By"], []string{"node/node1"}) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("When one of the backends times out", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + shutdownChan := make(chan struct{}) + nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, dummyRes)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan, 7*time.Second)) + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + localSvr.HandleRPC(rr, req) + resp := rr.Result() + shutdownChan <- struct{}{} + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + servedBy := "node/node1" + require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1") + + require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request") + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.False(t, rpcRes.IsError()) + + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("allBackends times out", func(t *testing.T) { + + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + shutdownChan1 := make(chan struct{}) + shutdownChan2 := make(chan struct{}) + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan1, 7*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan2, 7*time.Second)) + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + shutdownChan1 <- struct{}{} + shutdownChan2 <- struct{}{} + wg.Done() + }() + + fmt.Println("sending request") + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + require.Equal(t, 503, resp.StatusCode, "expected no response") + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + require.True(t, rpcRes.IsError()) + require.Equal(t, rpcRes.Error.Code, proxyd.ErrNoBackends.Code) + + // Wait for test response to complete before checking query count + wg.Wait() + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3")) + }) + + t.Run("Test with many multi-calls in without resetting", func(t *testing.T) { + nodes, _, _, shutdown, svr, _ := setupMulticall(t) + defer nodes["node1"].mockBackend.Close() + defer nodes["node2"].mockBackend.Close() + defer nodes["node3"].mockBackend.Close() + defer shutdown() + + for i := 1; i < 4; i++ { + shutdownChan1 := make(chan struct{}) + shutdownChan2 := make(chan struct{}) + shutdownChan3 := make(chan struct{}) + + switch { + case i == 1: + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, txAccepted, shutdownChan1, 1*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(429, dummyRes, shutdownChan2, 1*time.Second)) + nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(503, dummyRes, shutdownChan3, 1*time.Second)) + case i == 2: + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second)) + nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(405, dummyRes, shutdownChan3, 1*time.Second)) + case i == 3: + // Return the quickest response + nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second)) + nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(500, dummyRes, shutdownChan2, 1*time.Second)) + nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan3, 1*time.Second)) + } + + localSvr := setServerBackend(svr, nodes) + + body := makeSendRawTransaction(txHex1) + req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body)) + req.Header.Set("X-Forwarded-For", "203.0.113.1") + rr := httptest.NewRecorder() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + shutdownChan1 <- struct{}{} + shutdownChan2 <- struct{}{} + shutdownChan3 <- struct{}{} + wg.Done() + }() + + localSvr.HandleRPC(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + require.NotNil(t, resp.Body) + rpcRes := &proxyd.RPCRes{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes)) + + switch { + case i == 1: + servedBy := "node/node1" + require.NotNil(t, rpcRes.Result) + require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1") + require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request") + require.False(t, rpcRes.IsError()) + case i == 2: + servedBy := "node/node2" + require.Nil(t, rpcRes.Result) + require.Equal(t, 200, resp.StatusCode, "expected 200 response from node2") + require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request") + require.True(t, rpcRes.IsError()) + case i == 3: + servedBy := "node/node3" + require.Nil(t, rpcRes.Result) + require.Equal(t, 200, resp.StatusCode, "expected 200 response from node3") + require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request") + require.True(t, rpcRes.IsError()) + } + // Wait for test response to complete before checking query count + wg.Wait() + require.Equal(t, i, nodeBackendRequestCount(nodes, "node1")) + require.Equal(t, i, nodeBackendRequestCount(nodes, "node2")) + require.Equal(t, i, nodeBackendRequestCount(nodes, "node3")) + } + }) + +} + +func SingleResponseHandlerWithSleep(code int, response string, duration time.Duration) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + fmt.Println("sleeping") + time.Sleep(duration) + fmt.Println("Shutting down Single Response Handler") + w.WriteHeader(code) + _, _ = w.Write([]byte(response)) + } +} + +func SingleResponseHandlerWithSleepShutdown(code int, response string, shutdownServer chan struct{}, duration time.Duration) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + fmt.Println("sleeping") + time.Sleep(duration) + <-shutdownServer + fmt.Println("Shutting down Single Response Handler") + w.WriteHeader(code) + _, _ = w.Write([]byte(response)) + } +} diff --git a/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml index 99a2521..e362d55 100644 --- a/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml +++ b/proxyd/integration_tests/testdata/block_height_zero_and_network_errors.toml @@ -16,7 +16,7 @@ rpc_url = "$NODE2_URL" [backend_groups] [backend_groups.node] backends = ["node1", "node2"] -consensus_aware = true +routing_strategy = "consensus_aware" consensus_handler = "noop" # allow more control over the consensus poller for tests ## Consensus Ban Need to set very large, becaue consensus poller uses system clock, not adjustable clock diff --git a/proxyd/integration_tests/testdata/consensus.toml b/proxyd/integration_tests/testdata/consensus.toml index bb13036..fbde0be 100644 --- a/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/integration_tests/testdata/consensus.toml @@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL" [backend_groups] [backend_groups.node] backends = ["node1", "node2"] -consensus_aware = true +routing_strategy = "consensus_aware" consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_ban_period = "1m" consensus_max_update_threshold = "2m" diff --git a/proxyd/integration_tests/testdata/empty_responses.yml b/proxyd/integration_tests/testdata/empty_responses.yml new file mode 100644 index 0000000..56c7760 --- /dev/null +++ b/proxyd/integration_tests/testdata/empty_responses.yml @@ -0,0 +1,7 @@ +- method: bad_method + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": "nope", + } diff --git a/proxyd/integration_tests/testdata/fallback.toml b/proxyd/integration_tests/testdata/fallback.toml index c801ca3..c1fb598 100644 --- a/proxyd/integration_tests/testdata/fallback.toml +++ b/proxyd/integration_tests/testdata/fallback.toml @@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL" [backend_groups] [backend_groups.node] backends = ["normal", "fallback"] -consensus_aware = true +routing_strategy = "consensus_aware" consensus_handler = "noop" # allow more control over the consensus poller for tests consensus_ban_period = "1m" consensus_max_update_threshold = "2m" diff --git a/proxyd/integration_tests/testdata/multicall.toml b/proxyd/integration_tests/testdata/multicall.toml new file mode 100644 index 0000000..ab78c8a --- /dev/null +++ b/proxyd/integration_tests/testdata/multicall.toml @@ -0,0 +1,27 @@ +[server] +rpc_port = 8545 +enable_served_by_header = true +timeout_seconds = 7 + +[backend] +response_timeout_seconds = 4 +max_degraded_latency_threshold = "30ms" + +[backends] +[backends.node1] +rpc_url = "$NODE1_URL" + +[backends.node2] +rpc_url = "$NODE2_URL" + +[backends.node3] +rpc_url = "$NODE3_URL" + +[backend_groups] +[backend_groups.node] +backends = ["node1", "node2", "node3"] +routing_strategy = "multicall" + +[rpc_method_mappings] +eth_call = "node" +eth_sendRawTransaction = "node" diff --git a/proxyd/metrics.go b/proxyd/metrics.go index 4046af0..b79518a 100644 --- a/proxyd/metrics.go +++ b/proxyd/metrics.go @@ -428,6 +428,24 @@ var ( "backend_name", "fallback", }) + + backendGroupMulticallCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "backend_group_multicall_request_counter", + Help: "Record the amount of multicall requests", + }, []string{ + "backend_group", + "backend_name", + }) + + backendGroupMulticallCompletionCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "backend_group_multicall_completion_counter", + Help: "Record the amount of completed multicall requests", + }, []string{ + "backend_group", + "backend_name", + }) ) func RecordRedisError(source string) { @@ -593,6 +611,14 @@ func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) { backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback)) } +func RecordBackendGroupMulticallRequest(bg *BackendGroup, backendName string) { + backendGroupMulticallCounter.WithLabelValues(bg.Name, backendName).Inc() +} + +func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string) { + backendGroupMulticallCompletionCounter.WithLabelValues(bg.Name, backendName).Inc() +} + func boolToFloat64(b bool) float64 { if b { return 1 diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index 402909b..ba13d66 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -228,6 +228,7 @@ func Start(config *Config) (*Server, func(), error) { Backends: backends, WeightedRouting: bg.WeightedRouting, FallbackBackends: fallbackBackends, + routingStrategy: bg.RoutingStrategy, } } @@ -353,7 +354,14 @@ func Start(config *Config) (*Server, func(), error) { for bgName, bg := range backendGroups { bgcfg := config.BackendGroups[bgName] - if bgcfg.ConsensusAware { + + if !bgcfg.ValidateRoutingStrategy(bgName) { + log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, \"\"", "name", bgName) + } + + log.Info("configuring routing strategy for backend_group", "name", bgName, "routing_strategy", bgcfg.RoutingStrategy) + + if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy { log.Info("creating poller for consensus aware backend_group", "name", bgName) copts := make([]ConsensusOpt, 0)