feat: add multi call routing strategy (#26)

* feat: add multicall routing strategy
    * Updated proxyd config to accept a routing strategy parameter 
    * Added multicall routing strategies
    * Refactored backendGroup.Forward to handle multiple types of routing strategies
    * Background.ForwardToBackend now will return results via a channel
This commit is contained in:
Jacob Elias 2024-07-22 16:44:27 -05:00 committed by GitHub
parent ec496f559b
commit d4382bfa19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 921 additions and 103 deletions

@ -372,6 +372,14 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
),
)
log.Trace(
"forwarding request to backend",
"name", b.Name,
"req_id", GetReqID(ctx),
"attempt_count", i+1,
"max_attempts", b.maxRetries+1,
"method", metricLabelMethod,
)
res, err := b.doForward(ctx, reqs, isBatch)
switch err {
case nil: // do nothing
@ -381,6 +389,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
"name", b.Name,
"req_id", GetReqID(ctx),
"max", b.maxResponseSize,
"method", metricLabelMethod,
)
RecordBatchRPCError(ctx, b.Name, reqs, err)
case ErrConsensusGetReceiptsCantBeBatched:
@ -415,6 +424,9 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
"name", b.Name,
"req_id", GetReqID(ctx),
"err", err,
"method", metricLabelMethod,
"attempt_count", i+1,
"max_retries", b.maxRetries+1,
)
timer.ObserveDuration()
RecordBatchRPCError(ctx, b.Name, reqs, err)
@ -716,6 +728,11 @@ type BackendGroup struct {
WeightedRouting bool
Consensus *ConsensusPoller
FallbackBackends map[string]bool
routingStrategy RoutingStrategy
}
func (bg *BackendGroup) GetRoutingStrategy() RoutingStrategy {
return bg.routingStrategy
}
func (bg *BackendGroup) Fallbacks() []*Backend {
@ -750,113 +767,165 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
overriddenResponses := make([]*indexedReqRes, 0)
rewrittenReqs := make([]*RPCReq, 0, len(rpcReqs))
// When routing_strategy is set to `consensus_aware` the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
// We also rewrite block tags to enforce compliance with consensus
if bg.Consensus != nil {
// When `consensus_aware` is set to `true`, the backend group acts as a load balancer
// serving traffic from any backend that agrees in the consensus group
rpcReqs, overriddenResponses = bg.OverwriteConsensusResponses(rpcReqs, overriddenResponses, rewrittenReqs)
}
// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
result, err := RewriteTags(rctx, req, &res)
switch result {
case RewriteOverrideError:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
rpcReqs = rewrittenReqs
// When routing_strategy is set to 'multicall' the request will be forward to all backends
// and return the first successful response
if bg.GetRoutingStrategy() == MulticallRoutingStrategy && isValidMulticallTx(rpcReqs) && !isBatch {
backendResp := bg.ExecuteMulticall(ctx, rpcReqs)
return backendResp.RPCRes, backendResp.ServedBy, backendResp.error
}
rpcRequestsTotal.Inc()
for _, back := range backends {
res := make([]*RPCRes, 0)
var err error
ch := make(chan BackendGroupRPCResponse)
go func() {
defer close(ch)
backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, backends, ctx, isBatch)
ch <- *backendResp
}()
backendResp := <-ch
servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name)
if len(rpcReqs) > 0 {
res, err = back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
errors.Is(err, ErrMethodNotWhitelisted) {
return nil, "", err
}
if errors.Is(err, ErrBackendResponseTooLarge) {
return nil, servedBy, err
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
"skipping offline backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if errors.Is(err, ErrBackendOverCapacity) {
log.Warn(
"skipping over-capacity backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if err != nil {
log.Error(
"error forwarding request to backend",
"name", back.Name,
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", err,
)
continue
}
}
// re-apply overridden responses
for _, ov := range overriddenResponses {
if len(res) > 0 {
// insert ov.res at position ov.index
res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
} else {
res = append(res, ov.res)
}
}
return res, servedBy, nil
if backendResp.error != nil {
log.Error("error serving requests",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", backendResp.error,
)
return backendResp.RPCRes, backendResp.ServedBy, backendResp.error
}
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, "", ErrNoBackends
// re-apply overridden responses
log.Trace("successfully served request overriding responses",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
)
res := OverrideResponses(backendResp.RPCRes, overriddenResponses)
return res, backendResp.ServedBy, backendResp.error
}
func isValidMulticallTx(rpcReqs []*RPCReq) bool {
if len(rpcReqs) == 1 {
if rpcReqs[0].Method == "eth_sendRawTransaction" {
return true
}
}
return false
}
// Using special struct since servedBy may not be populated if error occurs
type multicallTuple struct {
response *BackendGroupRPCResponse
backendName string
}
func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq) *BackendGroupRPCResponse {
// Create ctx without cancel so background tasks process
// after original request returns
bgCtx := context.WithoutCancel(ctx)
log.Info("executing multicall routing strategy",
"req_id", GetReqID(bgCtx),
"auth", GetAuthCtx(bgCtx),
)
var wg sync.WaitGroup
ch := make(chan *multicallTuple)
for _, backend := range bg.Backends {
wg.Add(1)
go bg.MulticallRequest(backend, rpcReqs, &wg, bgCtx, ch)
}
go func() {
wg.Wait()
close(ch)
}()
return bg.ProcessMulticallResponses(ch, bgCtx)
}
func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg *sync.WaitGroup, ctx context.Context, ch chan *multicallTuple) {
defer wg.Done()
log.Debug("forwarding multicall request to backend",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
RecordBackendGroupMulticallRequest(bg, backend.Name)
backendResp := bg.ForwardRequestToBackendGroup(rpcReqs, []*Backend{backend}, ctx, false)
multicallResp := &multicallTuple{
response: backendResp,
backendName: backend.Name,
}
ch <- multicallResp
log.Debug("received multicall response from backend",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
if backendResp.error != nil {
log.Error("received multicall error response from backend",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
"error", backendResp.error.Error(),
)
}
}
func (bg *BackendGroup) ProcessMulticallResponses(ch chan *multicallTuple, ctx context.Context) *BackendGroupRPCResponse {
var finalResp *BackendGroupRPCResponse
i := 0
for {
multicallResp, ok := <-ch
if !ok {
log.Trace("multicall response channel closed",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"response_count", i,
)
if i > 0 {
return finalResp
}
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: errors.New("no multicall response received"),
}
}
i++
resp := multicallResp.response
backendName := multicallResp.backendName
RecordBackendGroupMulticallCompletion(bg, backendName)
if resp.error != nil {
log.Error("received error response from multicall channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", resp.error,
"backend", backendName,
)
finalResp = resp
continue
}
log.Info("received successful response from multicall channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"served_by", resp.ServedBy,
"backend", backendName,
)
return resp
}
}
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
@ -1281,3 +1350,139 @@ func stripXFF(xff string) string {
ipList := strings.Split(xff, ",")
return strings.TrimSpace(ipList[0])
}
type BackendGroupRPCResponse struct {
RPCRes []*RPCRes
ServedBy string
error error
}
func (bg *BackendGroup) ForwardRequestToBackendGroup(
rpcReqs []*RPCReq,
backends []*Backend,
ctx context.Context,
isBatch bool,
) *BackendGroupRPCResponse {
for _, back := range backends {
res := make([]*RPCRes, 0)
var err error
servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name)
if len(rpcReqs) > 0 {
res, err = back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
errors.Is(err, ErrMethodNotWhitelisted) {
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: err,
}
}
if errors.Is(err, ErrBackendResponseTooLarge) {
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: err,
}
}
if errors.Is(err, ErrBackendOffline) {
log.Warn(
"skipping offline backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if errors.Is(err, ErrBackendOverCapacity) {
log.Warn(
"skipping over-capacity backend",
"name", back.Name,
"auth", GetAuthCtx(ctx),
"req_id", GetReqID(ctx),
)
continue
}
if err != nil {
log.Error(
"error forwarding request to backend",
"name", back.Name,
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"err", err,
)
continue
}
}
return &BackendGroupRPCResponse{
RPCRes: res,
ServedBy: servedBy,
error: nil,
}
}
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return &BackendGroupRPCResponse{
RPCRes: nil,
ServedBy: "",
error: ErrNoBackends,
}
}
func OverrideResponses(res []*RPCRes, overriddenResponses []*indexedReqRes) []*RPCRes {
for _, ov := range overriddenResponses {
if len(res) > 0 {
// insert ov.res at position ov.index
res = append(res[:ov.index], append([]*RPCRes{ov.res}, res[ov.index:]...)...)
} else {
res = append(res, ov.res)
}
}
return res
}
func (bg *BackendGroup) OverwriteConsensusResponses(rpcReqs []*RPCReq, overriddenResponses []*indexedReqRes, rewrittenReqs []*RPCReq) ([]*RPCReq, []*indexedReqRes) {
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
maxBlockRange: bg.Consensus.maxBlockRange,
}
for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
result, err := RewriteTags(rctx, req, &res)
switch result {
case RewriteOverrideError:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
if errors.Is(err, ErrRewriteBlockOutOfRange) {
res.Error = ErrBlockOutOfRange
} else if errors.Is(err, ErrRewriteRangeTooLarge) {
res.Error = ErrInvalidParams(
fmt.Sprintf("block range greater than %d max", rctx.maxBlockRange),
)
} else {
res.Error = ErrParseErr
}
case RewriteOverrideResponse:
overriddenResponses = append(overriddenResponses, &indexedReqRes{
index: i,
req: req,
res: &res,
})
case RewriteOverrideRequest, RewriteNone:
rewrittenReqs = append(rewrittenReqs, req)
}
}
return rewrittenReqs, overriddenResponses
}

@ -6,6 +6,8 @@ import (
"os"
"strings"
"time"
"github.com/ethereum/go-ethereum/log"
)
type ServerConfig struct {
@ -107,11 +109,54 @@ type BackendConfig struct {
type BackendsConfig map[string]*BackendConfig
type RoutingStrategy string
func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool {
// If Consensus Aware is Set and Routing RoutingStrategy is populated fail
if b.ConsensusAware && b.RoutingStrategy != "" {
log.Warn("consensus_aware is now deprecated, please use routing_strategy = consensus_aware")
log.Crit("Exiting consensus_aware and routing strategy are mutually exclusive, they cannot both be defined")
}
// If Consensus Aware is Set set RoutingStrategy to consensus_aware
if b.ConsensusAware {
b.RoutingStrategy = ConsensusAwareRoutingStrategy
log.Info("consensus_aware is now deprecated, please use routing_strategy = consenus_aware in the future")
}
switch b.RoutingStrategy {
case ConsensusAwareRoutingStrategy:
return true
case MulticallRoutingStrategy:
return true
case FallbackRoutingStrategy:
return true
case "":
log.Info("Empty routing strategy provided for backend_group, using fallback strategy ", "name", bgName)
b.RoutingStrategy = FallbackRoutingStrategy
return true
default:
return false
}
}
const (
ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware"
MulticallRoutingStrategy RoutingStrategy = "multicall"
FallbackRoutingStrategy RoutingStrategy = "fallback"
)
type BackendGroupConfig struct {
Backends []string `toml:"backends"`
WeightedRouting bool `toml:"weighted_routing"`
RoutingStrategy RoutingStrategy `toml:"routing_strategy"`
/*
Deprecated: Use routing_strategy config to create a consensus_aware proxyd instance
*/
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"`

@ -72,7 +72,7 @@ func setupBlockHeightZero(t *testing.T) (map[string]*bhZeroNodeContext, *proxyd.
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus)
require.NotNil(t, bg.Consensus, "Expected Consenus Poller to be intialized")
require.Equal(t, 2, len(bg.Backends))
// convenient mapping to access the nodes

@ -0,0 +1,500 @@
package integration_tests
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
const nonceErrorResponse = `{"jsonrpc": "2.0","error": {"code": -32000, "message": "nonce too low"},"id": 1}`
const txAccepted = `{"jsonrpc": "2.0","result": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef","id": 1}`
func setupMulticall(t *testing.T) (map[string]nodeContext, *proxyd.BackendGroup, *ProxydHTTPClient, func(), *proxyd.Server, []*ms.MockedHandler) {
// setup mock servers
node1 := NewMockBackend(nil)
node2 := NewMockBackend(nil)
node3 := NewMockBackend(nil)
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/multicall_responses.yml")
emptyResponses := path.Join(dir, "testdata/empty_responses.yml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: "",
}
h3 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: emptyResponses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
require.NoError(t, os.Setenv("NODE3_URL", node3.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(SingleResponseHandler(200, txAccepted))
node3.SetHandler(SingleResponseHandler(429, dummyRes))
// setup proxyd
config := ReadConfig("multicall")
fmt.Printf("[SetupMulticall] Using Timeout of %d \n", config.Server.TimeoutSeconds)
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
// expose the proxyd client
client := NewProxydClient("http://127.0.0.1:8545")
// expose the backend group
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.Nil(t, bg.Consensus, "Expeceted consensus not to be initialized")
require.Equal(t, 3, len(bg.Backends))
require.Equal(t, bg.GetRoutingStrategy(), proxyd.MulticallRoutingStrategy)
// convenient mapping to access the nodes by name
nodes := map[string]nodeContext{
"node1": {
mockBackend: node1,
backend: bg.Backends[0],
handler: &h1,
},
"node2": {
mockBackend: node2,
backend: bg.Backends[1],
handler: &h2,
},
"node3": {
mockBackend: node3,
backend: bg.Backends[2],
handler: &h3,
},
}
handlers := []*ms.MockedHandler{&h1, &h2, &h3}
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
nodes["node2"].mockBackend.SetHandler(http.HandlerFunc(handlers[1].Handler))
//Node 3 has no handler empty handler never respondes should always context timeout
nodes["node3"].mockBackend.SetHandler(http.HandlerFunc(handlers[2].Handler))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 0, nodeBackendRequestCount(nodes, "node3"))
return nodes, bg, client, shutdown, svr, handlers
}
func setServerBackend(s *proxyd.Server, nm map[string]nodeContext) *proxyd.Server {
bg := s.BackendGroups
bg["node"].Backends = []*proxyd.Backend{
nm["node1"].backend,
nm["node2"].backend,
nm["node3"].backend,
}
s.BackendGroups = bg
return s
}
func nodeBackendRequestCount(nodes map[string]nodeContext, node string) int {
return len(nodes[node].mockBackend.requests)
}
func TestMulticall(t *testing.T) {
t.Run("Multicall will request all backends", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(401, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(500, dummyRes))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
svr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node3"})
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("When all of the backends return non 200, multicall should return 503", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, dummyRes))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 503, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.True(t, rpcRes.IsError())
require.Equal(t, proxyd.ErrNoBackends.Code, rpcRes.Error.Code)
require.Equal(t, proxyd.ErrNoBackends.Message, rpcRes.Error.Message)
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("It should return the first 200 response", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(200, txAccepted))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"})
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("Ensure application level error is returned to caller if its first", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan1, 4*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandler(403, dummyRes))
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan2 <- struct{}{}
shutdownChan1 <- struct{}{}
wg.Done()
}()
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node2"})
require.True(t, rpcRes.IsError())
wg.Wait()
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("It should ignore network errors and return a 200 from a slower request", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
// We should ignore node2 first response cause 429, and return node 1 because 200
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleep(200, txAccepted, 3*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandler(429, txAccepted))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 200, resp.StatusCode)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, "2.0", rpcRes.JSONRPC)
require.Equal(t, resp.Header["X-Served-By"], []string{"node/node1"})
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("When one of the backends times out", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandler(200, dummyRes))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan, 7*time.Second))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
shutdownChan <- struct{}{}
defer resp.Body.Close()
require.NotNil(t, resp.Body)
servedBy := "node/node1"
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.False(t, rpcRes.IsError())
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("allBackends times out", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan1, 7*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, dummyRes, shutdownChan2, 7*time.Second))
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan1 <- struct{}{}
shutdownChan2 <- struct{}{}
wg.Done()
}()
fmt.Println("sending request")
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
require.Equal(t, 503, resp.StatusCode, "expected no response")
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
require.True(t, rpcRes.IsError())
require.Equal(t, rpcRes.Error.Code, proxyd.ErrNoBackends.Code)
// Wait for test response to complete before checking query count
wg.Wait()
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, 1, nodeBackendRequestCount(nodes, "node3"))
})
t.Run("Test with many multi-calls in without resetting", func(t *testing.T) {
nodes, _, _, shutdown, svr, _ := setupMulticall(t)
defer nodes["node1"].mockBackend.Close()
defer nodes["node2"].mockBackend.Close()
defer nodes["node3"].mockBackend.Close()
defer shutdown()
for i := 1; i < 4; i++ {
shutdownChan1 := make(chan struct{})
shutdownChan2 := make(chan struct{})
shutdownChan3 := make(chan struct{})
switch {
case i == 1:
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, txAccepted, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(429, dummyRes, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(503, dummyRes, shutdownChan3, 1*time.Second))
case i == 2:
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(405, dummyRes, shutdownChan3, 1*time.Second))
case i == 3:
// Return the quickest response
nodes["node1"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(404, dummyRes, shutdownChan1, 1*time.Second))
nodes["node2"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(500, dummyRes, shutdownChan2, 1*time.Second))
nodes["node3"].mockBackend.SetHandler(SingleResponseHandlerWithSleepShutdown(200, nonceErrorResponse, shutdownChan3, 1*time.Second))
}
localSvr := setServerBackend(svr, nodes)
body := makeSendRawTransaction(txHex1)
req, _ := http.NewRequest("POST", "https://1.1.1.1:8080", bytes.NewReader(body))
req.Header.Set("X-Forwarded-For", "203.0.113.1")
rr := httptest.NewRecorder()
var wg sync.WaitGroup
wg.Add(1)
go func() {
shutdownChan1 <- struct{}{}
shutdownChan2 <- struct{}{}
shutdownChan3 <- struct{}{}
wg.Done()
}()
localSvr.HandleRPC(rr, req)
resp := rr.Result()
defer resp.Body.Close()
require.NotNil(t, resp.Body)
rpcRes := &proxyd.RPCRes{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(rpcRes))
switch {
case i == 1:
servedBy := "node/node1"
require.NotNil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node1")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.False(t, rpcRes.IsError())
case i == 2:
servedBy := "node/node2"
require.Nil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node2")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.True(t, rpcRes.IsError())
case i == 3:
servedBy := "node/node3"
require.Nil(t, rpcRes.Result)
require.Equal(t, 200, resp.StatusCode, "expected 200 response from node3")
require.Equal(t, resp.Header["X-Served-By"], []string{servedBy}, "Error incorrect node served the request")
require.True(t, rpcRes.IsError())
}
// Wait for test response to complete before checking query count
wg.Wait()
require.Equal(t, i, nodeBackendRequestCount(nodes, "node1"))
require.Equal(t, i, nodeBackendRequestCount(nodes, "node2"))
require.Equal(t, i, nodeBackendRequestCount(nodes, "node3"))
}
})
}
func SingleResponseHandlerWithSleep(code int, response string, duration time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("sleeping")
time.Sleep(duration)
fmt.Println("Shutting down Single Response Handler")
w.WriteHeader(code)
_, _ = w.Write([]byte(response))
}
}
func SingleResponseHandlerWithSleepShutdown(code int, response string, shutdownServer chan struct{}, duration time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("sleeping")
time.Sleep(duration)
<-shutdownServer
fmt.Println("Shutting down Single Response Handler")
w.WriteHeader(code)
_, _ = w.Write([]byte(response))
}
}

@ -16,7 +16,7 @@ rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2"]
consensus_aware = true
routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests
## Consensus Ban Need to set very large, becaue consensus poller uses system clock, not adjustable clock

@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2"]
consensus_aware = true
routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"

@ -0,0 +1,7 @@
- method: bad_method
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": "nope",
}

@ -15,7 +15,7 @@ rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["normal", "fallback"]
consensus_aware = true
routing_strategy = "consensus_aware"
consensus_handler = "noop" # allow more control over the consensus poller for tests
consensus_ban_period = "1m"
consensus_max_update_threshold = "2m"

@ -0,0 +1,27 @@
[server]
rpc_port = 8545
enable_served_by_header = true
timeout_seconds = 7
[backend]
response_timeout_seconds = 4
max_degraded_latency_threshold = "30ms"
[backends]
[backends.node1]
rpc_url = "$NODE1_URL"
[backends.node2]
rpc_url = "$NODE2_URL"
[backends.node3]
rpc_url = "$NODE3_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2", "node3"]
routing_strategy = "multicall"
[rpc_method_mappings]
eth_call = "node"
eth_sendRawTransaction = "node"

@ -428,6 +428,24 @@ var (
"backend_name",
"fallback",
})
backendGroupMulticallCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "backend_group_multicall_request_counter",
Help: "Record the amount of multicall requests",
}, []string{
"backend_group",
"backend_name",
})
backendGroupMulticallCompletionCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "backend_group_multicall_completion_counter",
Help: "Record the amount of completed multicall requests",
}, []string{
"backend_group",
"backend_name",
})
)
func RecordRedisError(source string) {
@ -593,6 +611,14 @@ func RecordBackendGroupFallbacks(bg *BackendGroup, name string, fallback bool) {
backendGroupFallbackBackend.WithLabelValues(bg.Name, name, strconv.FormatBool(fallback)).Set(boolToFloat64(fallback))
}
func RecordBackendGroupMulticallRequest(bg *BackendGroup, backendName string) {
backendGroupMulticallCounter.WithLabelValues(bg.Name, backendName).Inc()
}
func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string) {
backendGroupMulticallCompletionCounter.WithLabelValues(bg.Name, backendName).Inc()
}
func boolToFloat64(b bool) float64 {
if b {
return 1

@ -228,6 +228,7 @@ func Start(config *Config) (*Server, func(), error) {
Backends: backends,
WeightedRouting: bg.WeightedRouting,
FallbackBackends: fallbackBackends,
routingStrategy: bg.RoutingStrategy,
}
}
@ -353,7 +354,14 @@ func Start(config *Config) (*Server, func(), error) {
for bgName, bg := range backendGroups {
bgcfg := config.BackendGroups[bgName]
if bgcfg.ConsensusAware {
if !bgcfg.ValidateRoutingStrategy(bgName) {
log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, \"\"", "name", bgName)
}
log.Info("configuring routing strategy for backend_group", "name", bgName, "routing_strategy", bgcfg.RoutingStrategy)
if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy {
log.Info("creating poller for consensus aware backend_group", "name", bgName)
copts := make([]ConsensusOpt, 0)