diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index d043cae..5e8c632 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -365,6 +365,36 @@ func (b *Backend) setOffline() { } } +// ForwardRPC makes a call directly to a backend and populate the response into `res` +func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error { + jsonParams, err := json.Marshal(params) + if err != nil { + return err + } + + rpcReq := RPCReq{ + JSONRPC: JSONRPCVersion, + Method: method, + Params: jsonParams, + ID: []byte(id), + } + + slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false) + if err != nil { + return err + } + + if len(slicedRes) != 1 { + return fmt.Errorf("unexpected response len for non-batched request (len != 1)") + } + if slicedRes[0].IsError() { + return fmt.Errorf(slicedRes[0].Error.Error()) + } + + *res = *(slicedRes[0]) + return nil +} + func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { isSingleElementBatch := len(rpcReqs) == 1 @@ -484,8 +514,9 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) { } type BackendGroup struct { - Name string - Backends []*Backend + Name string + Backends []*Backend + Consensus *ConsensusPoller } func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { diff --git a/proxyd/proxyd/cmd/proxyd/main.go b/proxyd/proxyd/cmd/proxyd/main.go index c184a1d..a97aacb 100644 --- a/proxyd/proxyd/cmd/proxyd/main.go +++ b/proxyd/proxyd/cmd/proxyd/main.go @@ -52,7 +52,7 @@ func main() { ), ) - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) if err != nil { log.Crit("error starting proxyd", "err", err) } diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 7a004f0..94c1f83 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -82,6 +82,7 @@ type BackendConfig struct { Password string `toml:"password"` RPCURL string `toml:"rpc_url"` WSURL string `toml:"ws_url"` + WSPort int `toml:"ws_port"` MaxRPS int `toml:"max_rps"` MaxWSConns int `toml:"max_ws_conns"` CAFile string `toml:"ca_file"` @@ -93,7 +94,9 @@ type BackendConfig struct { type BackendsConfig map[string]*BackendConfig type BackendGroupConfig struct { - Backends []string `toml:"backends"` + Backends []string `toml:"backends"` + ConsensusAware bool `toml:"consensus_aware"` + ConsensusAsyncHandler string `toml:"consensus_handler"` } type BackendGroupsConfig map[string]*BackendGroupConfig diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go new file mode 100644 index 0000000..81f8fc3 --- /dev/null +++ b/proxyd/proxyd/consensus_poller.go @@ -0,0 +1,345 @@ +package proxyd + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + + "github.com/ethereum/go-ethereum/log" +) + +const ( + PollerInterval = 1 * time.Second +) + +// ConsensusPoller checks the consensus state for each member of a BackendGroup +// resolves the highest common block for multiple nodes, and reconciles the consensus +// in case of block hash divergence to minimize re-orgs +type ConsensusPoller struct { + cancelFunc context.CancelFunc + + backendGroup *BackendGroup + backendState map[*Backend]*backendState + consensusGroupMux sync.Mutex + consensusGroup []*Backend + + tracker ConsensusTracker + asyncHandler ConsensusAsyncHandler +} + +type backendState struct { + backendStateMux sync.Mutex + + latestBlockNumber string + latestBlockHash string + + lastUpdate time.Time + + bannedUntil time.Time +} + +// GetConsensusGroup returns the backend members that are agreeing in a consensus +func (cp *ConsensusPoller) GetConsensusGroup() []*Backend { + defer cp.consensusGroupMux.Unlock() + cp.consensusGroupMux.Lock() + + g := make([]*Backend, len(cp.backendGroup.Backends)) + copy(g, cp.consensusGroup) + + return g +} + +// GetConsensusBlockNumber returns the agreed block number in a consensus +func (ct *ConsensusPoller) GetConsensusBlockNumber() string { + return ct.tracker.GetConsensusBlockNumber() +} + +func (cp *ConsensusPoller) Shutdown() { + cp.asyncHandler.Shutdown() +} + +// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown +type ConsensusAsyncHandler interface { + Init() + Shutdown() +} + +// NoopAsyncHandler allows fine control updating the consensus +type NoopAsyncHandler struct{} + +func NewNoopAsyncHandler() ConsensusAsyncHandler { + log.Warn("using NewNoopAsyncHandler") + return &NoopAsyncHandler{} +} +func (ah *NoopAsyncHandler) Init() {} +func (ah *NoopAsyncHandler) Shutdown() {} + +// PollerAsyncHandler asynchronously updates each individual backend and the group consensus +type PollerAsyncHandler struct { + ctx context.Context + cp *ConsensusPoller +} + +func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler { + return &PollerAsyncHandler{ + ctx: ctx, + cp: cp, + } +} +func (ah *PollerAsyncHandler) Init() { + // create the individual backend pollers + for _, be := range ah.cp.backendGroup.Backends { + go func(be *Backend) { + for { + timer := time.NewTimer(PollerInterval) + ah.cp.UpdateBackend(ah.ctx, be) + + select { + case <-timer.C: + case <-ah.ctx.Done(): + timer.Stop() + return + } + } + }(be) + } + + // create the group consensus poller + go func() { + for { + timer := time.NewTimer(PollerInterval) + ah.cp.UpdateBackendGroupConsensus(ah.ctx) + + select { + case <-timer.C: + case <-ah.ctx.Done(): + timer.Stop() + return + } + } + }() +} +func (ah *PollerAsyncHandler) Shutdown() { + ah.cp.cancelFunc() +} + +type ConsensusOpt func(cp *ConsensusPoller) + +func WithTracker(tracker ConsensusTracker) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.tracker = tracker + } +} + +func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.asyncHandler = asyncHandler + } +} + +func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller { + ctx, cancelFunc := context.WithCancel(context.Background()) + + state := make(map[*Backend]*backendState, len(bg.Backends)) + for _, be := range bg.Backends { + state[be] = &backendState{} + } + + cp := &ConsensusPoller{ + cancelFunc: cancelFunc, + backendGroup: bg, + backendState: state, + } + + for _, opt := range opts { + opt(cp) + } + + if cp.tracker == nil { + cp.tracker = NewInMemoryConsensusTracker() + } + + if cp.asyncHandler == nil { + cp.asyncHandler = NewPollerAsyncHandler(ctx, cp) + } + + cp.asyncHandler.Init() + + return cp +} + +// UpdateBackend refreshes the consensus state of a single backend +func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { + bs := cp.backendState[be] + if time.Now().Before(bs.bannedUntil) { + log.Warn("skipping backend banned", "backend", be.Name, "bannedUntil", bs.bannedUntil) + return + } + + if be.IsRateLimited() || !be.Online() { + return + } + + // we'll introduce here checks to ban the backend + // i.e. node is syncing the chain + + // then update backend consensus + + latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + return + } + + changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash) + + if changed { + backendLatestBlockBackend.WithLabelValues(be.Name).Set(blockToFloat(latestBlockNumber)) + log.Info("backend state updated", "name", be.Name, "state", bs) + } +} + +// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends +func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { + var lowestBlock string + var lowestBlockHash string + + currentConsensusBlockNumber := cp.GetConsensusBlockNumber() + + for _, be := range cp.backendGroup.Backends { + backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be) + if lowestBlock == "" || backendLatestBlockNumber < lowestBlock { + lowestBlock = backendLatestBlockNumber + lowestBlockHash = backendLatestBlockHash + } + } + + // no block to propose (i.e. initializing consensus) + if lowestBlock == "" { + return + } + + proposedBlock := lowestBlock + proposedBlockHash := lowestBlockHash + hasConsensus := false + + // check if everybody agrees on the same block hash + consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends)) + consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) + filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends)) + + if lowestBlock > currentConsensusBlockNumber { + log.Info("validating consensus on block", lowestBlock) + } + + broken := false + for !hasConsensus { + allAgreed := true + consensusBackends = consensusBackends[:0] + filteredBackendsNames = filteredBackendsNames[:0] + for _, be := range cp.backendGroup.Backends { + if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) { + filteredBackendsNames = append(filteredBackendsNames, be.Name) + continue + } + + actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock) + if err != nil { + log.Warn("error updating backend", "name", be.Name, "err", err) + continue + } + if proposedBlockHash == "" { + proposedBlockHash = actualBlockHash + } + blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash) + if blocksDontMatch { + if blockAheadOrEqual(currentConsensusBlockNumber, actualBlockNumber) { + log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash) + broken = true + } + allAgreed = false + break + } + consensusBackends = append(consensusBackends, be) + consensusBackendsNames = append(consensusBackendsNames, be.Name) + } + if allAgreed { + hasConsensus = true + } else { + // walk one block behind and try again + proposedBlock = hexAdd(proposedBlock, -1) + proposedBlockHash = "" + log.Info("no consensus, now trying", "block:", proposedBlock) + } + } + + if broken { + // propagate event to other interested parts, such as cache invalidator + log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash) + } + + cp.tracker.SetConsensusBlockNumber(proposedBlock) + consensusLatestBlock.Set(blockToFloat(proposedBlock)) + cp.consensusGroupMux.Lock() + cp.consensusGroup = consensusBackends + cp.consensusGroupMux.Unlock() + + log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", ")) +} + +// fetchBlock Convenient wrapper to make a request to get a block directly from the backend +func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber string, blockHash string, err error) { + var rpcRes RPCRes + err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) + if err != nil { + return "", "", err + } + + jsonMap, ok := rpcRes.Result.(map[string]interface{}) + if !ok { + return "", "", fmt.Errorf(fmt.Sprintf("unexpected response type checking consensus on backend %s", be.Name)) + } + blockNumber = jsonMap["number"].(string) + blockHash = jsonMap["hash"].(string) + + return +} + +func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber string, blockHash string) { + bs := cp.backendState[be] + bs.backendStateMux.Lock() + blockNumber = bs.latestBlockNumber + blockHash = bs.latestBlockHash + bs.backendStateMux.Unlock() + return +} + +func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber string, blockHash string) (changed bool) { + bs := cp.backendState[be] + bs.backendStateMux.Lock() + changed = bs.latestBlockHash != blockHash + bs.latestBlockNumber = blockNumber + bs.latestBlockHash = blockHash + bs.lastUpdate = time.Now() + bs.backendStateMux.Unlock() + return +} + +// hexAdd Convenient way to convert hex block to uint64, increment, and convert back to hex +func hexAdd(hexVal string, incr int64) string { + return hexutil.EncodeUint64(uint64(int64(hexutil.MustDecodeUint64(hexVal)) + incr)) +} + +// blockAheadOrEqual Convenient way to check if `baseBlock` is ahead or equal than `checkBlock` +func blockAheadOrEqual(baseBlock string, checkBlock string) bool { + return hexutil.MustDecodeUint64(baseBlock) >= hexutil.MustDecodeUint64(checkBlock) +} + +// blockToFloat Convenient way to convert a hex block to float64 +func blockToFloat(hexVal string) float64 { + return float64(hexutil.MustDecodeUint64(hexVal)) +} diff --git a/proxyd/proxyd/consensus_poller_test.go b/proxyd/proxyd/consensus_poller_test.go new file mode 100644 index 0000000..06680fa --- /dev/null +++ b/proxyd/proxyd/consensus_poller_test.go @@ -0,0 +1,74 @@ +package proxyd + +import ( + "testing" +) + +func Test_blockToFloat(t *testing.T) { + type args struct { + hexVal string + } + tests := []struct { + name string + args args + want float64 + }{ + {"0xf1b3", args{"0xf1b3"}, float64(61875)}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := blockToFloat(tt.args.hexVal); got != tt.want { + t.Errorf("blockToFloat() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_hexAdd(t *testing.T) { + type args struct { + hexVal string + incr int64 + } + tests := []struct { + name string + args args + want string + }{ + {"0x1", args{"0x1", 1}, "0x2"}, + {"0x2", args{"0x2", -1}, "0x1"}, + {"0xf", args{"0xf", 1}, "0x10"}, + {"0x10", args{"0x10", -1}, "0xf"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := hexAdd(tt.args.hexVal, tt.args.incr); got != tt.want { + t.Errorf("hexAdd() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_blockAheadOrEqual(t *testing.T) { + type args struct { + baseBlock string + checkBlock string + } + tests := []struct { + name string + args args + want bool + }{ + {"0x1 vs 0x1", args{"0x1", "0x1"}, true}, + {"0x2 vs 0x1", args{"0x2", "0x1"}, true}, + {"0x1 vs 0x2", args{"0x1", "0x2"}, false}, + {"0xff vs 0x100", args{"0xff", "0x100"}, false}, + {"0x100 vs 0xff", args{"0x100", "0xff"}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := blockAheadOrEqual(tt.args.baseBlock, tt.args.checkBlock); got != tt.want { + t.Errorf("blockAheadOrEqual() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/proxyd/proxyd/consensus_tracker.go b/proxyd/proxyd/consensus_tracker.go new file mode 100644 index 0000000..7218b32 --- /dev/null +++ b/proxyd/proxyd/consensus_tracker.go @@ -0,0 +1,70 @@ +package proxyd + +import ( + "context" + "fmt" + "sync" + + "github.com/go-redis/redis/v8" +) + +// ConsensusTracker abstracts how we store and retrieve the current consensus +// allowing it to be stored locally in-memory or in a shared Redis cluster +type ConsensusTracker interface { + GetConsensusBlockNumber() string + SetConsensusBlockNumber(blockNumber string) +} + +// InMemoryConsensusTracker store and retrieve in memory, async-safe +type InMemoryConsensusTracker struct { + consensusBlockNumber string + mutex sync.Mutex +} + +func NewInMemoryConsensusTracker() ConsensusTracker { + return &InMemoryConsensusTracker{ + consensusBlockNumber: "", // empty string semantics means unknown + mutex: sync.Mutex{}, + } +} + +func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() string { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + return ct.consensusBlockNumber +} + +func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber string) { + defer ct.mutex.Unlock() + ct.mutex.Lock() + + ct.consensusBlockNumber = blockNumber +} + +// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe +type RedisConsensusTracker struct { + ctx context.Context + client *redis.Client + backendGroup string +} + +func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace string) ConsensusTracker { + return &RedisConsensusTracker{ + ctx: ctx, + client: r, + backendGroup: namespace, + } +} + +func (ct *RedisConsensusTracker) key() string { + return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup) +} + +func (ct *RedisConsensusTracker) GetConsensusBlockNumber() string { + return ct.client.Get(ct.ctx, ct.key()).Val() +} + +func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber string) { + ct.client.Set(ct.ctx, ct.key(), blockNumber, 0) +} diff --git a/proxyd/proxyd/example.config.toml b/proxyd/proxyd/example.config.toml index fb8fea9..16408e8 100644 --- a/proxyd/proxyd/example.config.toml +++ b/proxyd/proxyd/example.config.toml @@ -15,6 +15,7 @@ rpc_port = 8080 # Host for the proxyd WS server to listen on. ws_host = "0.0.0.0" # Port for the above +# Set the ws_port to 0 to disable WS ws_port = 8085 # Maximum client body size, in bytes, that the server will accept. max_body_size_bytes = 10485760 diff --git a/proxyd/proxyd/go.mod b/proxyd/proxyd/go.mod index d73d6fc..68bf63c 100644 --- a/proxyd/proxyd/go.mod +++ b/proxyd/proxyd/go.mod @@ -11,10 +11,12 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.1 github.com/rs/cors v1.8.2 github.com/stretchr/testify v1.7.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + gopkg.in/yaml.v2 v2.4.0 ) require ( diff --git a/proxyd/proxyd/integration_tests/batch_timeout_test.go b/proxyd/proxyd/integration_tests/batch_timeout_test.go index 372f047..4906c1d 100644 --- a/proxyd/proxyd/integration_tests/batch_timeout_test.go +++ b/proxyd/proxyd/integration_tests/batch_timeout_test.go @@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) { config := ReadConfig("batch_timeout") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/batching_test.go b/proxyd/proxyd/integration_tests/batching_test.go index b0f811c..e40745d 100644 --- a/proxyd/proxyd/integration_tests/batching_test.go +++ b/proxyd/proxyd/integration_tests/batching_test.go @@ -148,7 +148,7 @@ func TestBatching(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/caching_test.go b/proxyd/proxyd/integration_tests/caching_test.go index a75a591..b0a2f20 100644 --- a/proxyd/proxyd/integration_tests/caching_test.go +++ b/proxyd/proxyd/integration_tests/caching_test.go @@ -35,7 +35,7 @@ func TestCaching(t *testing.T) { require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://127.0.0.1:%s", redis.Port()))) config := ReadConfig("caching") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) { config := ReadConfig("caching") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/consensus_test.go b/proxyd/proxyd/integration_tests/consensus_test.go new file mode 100644 index 0000000..1ed7d12 --- /dev/null +++ b/proxyd/proxyd/integration_tests/consensus_test.go @@ -0,0 +1,309 @@ +package integration_tests + +import ( + "context" + "fmt" + "net/http" + "os" + "path" + "testing" + + "github.com/ethereum-optimism/optimism/proxyd" + ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" + "github.com/stretchr/testify/require" +) + +func TestConsensus(t *testing.T) { + node1 := NewMockBackend(nil) + defer node1.Close() + node2 := NewMockBackend(nil) + defer node2.Close() + + dir, err := os.Getwd() + require.NoError(t, err) + + responses := path.Join(dir, "testdata/consensus_responses.yml") + + h1 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + h2 := ms.MockedHandler{ + Overrides: []*ms.MethodTemplate{}, + Autoload: true, + AutoloadFile: responses, + } + + require.NoError(t, os.Setenv("NODE1_URL", node1.URL())) + require.NoError(t, os.Setenv("NODE2_URL", node2.URL())) + + node1.SetHandler(http.HandlerFunc(h1.Handler)) + node2.SetHandler(http.HandlerFunc(h2.Handler)) + + config := ReadConfig("consensus") + ctx := context.Background() + svr, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + defer shutdown() + + bg := svr.BackendGroups["node"] + require.NotNil(t, bg) + require.NotNil(t, bg.Consensus) + + t.Run("initial consensus", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + + // unknown consensus at init + require.Equal(t, "", bg.Consensus.GetConsensusBlockNumber()) + + // first poll + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // consensus at block 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + }) + + t.Run("advance consensus", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // all nodes start at block 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // advance latest on node2 to 0x2 + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // consensus should stick to 0x1, since node1 is still lagging there + bg.Consensus.UpdateBackendGroupConsensus(ctx) + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // advance latest on node1 to 0x2 + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // should stick to 0x2, since now all nodes are at 0x2 + require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber()) + }) + + t.Run("broken consensus", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // all nodes start at block 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // advance latest on both nodes to 0x2 + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // at 0x2 + require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber()) + + // make node2 diverge on hash + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x2", + Response: buildResponse("0x2", "wrong_hash"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // should resolve to 0x1, since 0x2 is out of consensus at the moment + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // later, when impl events, listen to broken consensus event + }) + + t.Run("broken consensus with depth 2", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // all nodes start at block 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // advance latest on both nodes to 0x2 + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x2", "hash2"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // at 0x2 + require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber()) + + // advance latest on both nodes to 0x3 + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x3", "hash3"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x3", "hash3"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // at 0x3 + require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber()) + + // make node2 diverge on hash for blocks 0x2 and 0x3 + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x2", + Response: buildResponse("0x2", "wrong_hash2"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x3", + Response: buildResponse("0x3", "wrong_hash3"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // should resolve to 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + }) + + t.Run("fork in advanced block", func(t *testing.T) { + h1.ResetOverrides() + h2.ResetOverrides() + + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // all nodes start at block 0x1 + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + + // make nodes 1 and 2 advance in forks + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x2", + Response: buildResponse("0x2", "node1_0x2"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x2", + Response: buildResponse("0x2", "node2_0x2"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x3", + Response: buildResponse("0x3", "node1_0x3"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "0x3", + Response: buildResponse("0x3", "node2_0x3"), + }) + h1.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x3", "node1_0x3"), + }) + h2.AddOverride(&ms.MethodTemplate{ + Method: "eth_getBlockByNumber", + Block: "latest", + Response: buildResponse("0x3", "node2_0x3"), + }) + + // poll for group consensus + for _, be := range bg.Backends { + bg.Consensus.UpdateBackend(ctx, be) + } + bg.Consensus.UpdateBackendGroupConsensus(ctx) + + // should resolve to 0x1, the highest common ancestor + require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber()) + }) +} + +func buildResponse(number string, hash string) string { + return fmt.Sprintf(`{ + "jsonrpc": "2.0", + "id": 67, + "result": { + "number": "%s", + "hash": "%s" + } + }`, number, hash) +} diff --git a/proxyd/proxyd/integration_tests/failover_test.go b/proxyd/proxyd/integration_tests/failover_test.go index 47c9e26..119a901 100644 --- a/proxyd/proxyd/integration_tests/failover_test.go +++ b/proxyd/proxyd/integration_tests/failover_test.go @@ -30,7 +30,7 @@ func TestFailover(t *testing.T) { config := ReadConfig("failover") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -128,7 +128,7 @@ func TestRetries(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) config := ReadConfig("retries") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) { config := ReadConfig("out_of_service_interval") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) { require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) { require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/max_rpc_conns_test.go b/proxyd/proxyd/integration_tests/max_rpc_conns_test.go index 1ee1feb..5e23364 100644 --- a/proxyd/proxyd/integration_tests/max_rpc_conns_test.go +++ b/proxyd/proxyd/integration_tests/max_rpc_conns_test.go @@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) { config := ReadConfig("max_rpc_conns") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/rate_limit_test.go b/proxyd/proxyd/integration_tests/rate_limit_test.go index 7a70dea..dd69089 100644 --- a/proxyd/proxyd/integration_tests/rate_limit_test.go +++ b/proxyd/proxyd/integration_tests/rate_limit_test.go @@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) { config := ReadConfig("backend_rate_limit") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3) @@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) config := ReadConfig("frontend_rate_limit") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/sender_rate_limit_test.go b/proxyd/proxyd/integration_tests/sender_rate_limit_test.go index b8a7730..a31a077 100644 --- a/proxyd/proxyd/integration_tests/sender_rate_limit_test.go +++ b/proxyd/proxyd/integration_tests/sender_rate_limit_test.go @@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) { // validation. config.SenderRateLimit.Limit = math.MaxInt client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) { config := ReadConfig("sender_rate_limit") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml new file mode 100644 index 0000000..dbd8e26 --- /dev/null +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -0,0 +1,24 @@ +[server] +rpc_port = 8080 + +[backend] +response_timeout_seconds = 1 + +[backends] +[backends.node1] +rpc_url = "$NODE1_URL" + +[backends.node2] +rpc_url = "$NODE2_URL" + +[backend_groups] +[backend_groups.node] +backends = ["node1", "node2"] +consensus_aware = true +consensus_handler = "noop" # allow more control over the consensus poller for tests + +[rpc_method_mappings] +eth_call = "node" +eth_chainId = "node" +eth_blockNumber = "node" +eth_getBlockByNumber = "node" diff --git a/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml new file mode 100644 index 0000000..9d4f2d1 --- /dev/null +++ b/proxyd/proxyd/integration_tests/testdata/consensus_responses.yml @@ -0,0 +1,44 @@ +- method: eth_getBlockByNumber + block: latest + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash1", + "number": "0x1" + } + } +- method: eth_getBlockByNumber + block: 0x1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash1", + "number": "0x1" + } + } +- method: eth_getBlockByNumber + block: 0x2 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash2", + "number": "0x2" + } + } +- method: eth_getBlockByNumber + block: 0x3 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash3", + "number": "0x3" + } + } diff --git a/proxyd/proxyd/integration_tests/validation_test.go b/proxyd/proxyd/integration_tests/validation_test.go index f09e2c2..fed2c8a 100644 --- a/proxyd/proxyd/integration_tests/validation_test.go +++ b/proxyd/proxyd/integration_tests/validation_test.go @@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) { config := ReadConfig("whitelist") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) { config := ReadConfig("whitelist") client := NewProxydClient("http://127.0.0.1:8545") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/integration_tests/ws_test.go b/proxyd/proxyd/integration_tests/ws_test.go index 9ae12ab..ed33779 100644 --- a/proxyd/proxyd/integration_tests/ws_test.go +++ b/proxyd/proxyd/integration_tests/ws_test.go @@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) config := ReadConfig("ws") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) client, err := NewProxydWSClient("ws://127.0.0.1:8546", nil, nil) require.NoError(t, err) @@ -147,7 +147,7 @@ func TestWS(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) config := ReadConfig("ws") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) { clientHdlr.MsgCB(msgType, data) @@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) config := ReadConfig("ws") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() @@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) { require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) config := ReadConfig("ws") - shutdown, err := proxyd.Start(config) + _, shutdown, err := proxyd.Start(config) require.NoError(t, err) defer shutdown() diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 06fef15..656a8b6 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -242,6 +242,20 @@ var ( Name: "rate_limit_take_errors", Help: "Count of errors taking frontend rate limits", }) + + consensusLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "consensus_latest_block", + Help: "Consensus latest block", + }) + + backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "backend_latest_block", + Help: "Current latest block observed per backend", + }, []string{ + "backend_name", + }) ) func RecordRedisError(source string) { diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index ccc6897..5a34fa6 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -18,20 +18,20 @@ import ( "golang.org/x/sync/semaphore" ) -func Start(config *Config) (func(), error) { +func Start(config *Config) (*Server, func(), error) { if len(config.Backends) == 0 { - return nil, errors.New("must define at least one backend") + return nil, nil, errors.New("must define at least one backend") } if len(config.BackendGroups) == 0 { - return nil, errors.New("must define at least one backend group") + return nil, nil, errors.New("must define at least one backend group") } if len(config.RPCMethodMappings) == 0 { - return nil, errors.New("must define at least one RPC method mapping") + return nil, nil, errors.New("must define at least one RPC method mapping") } for authKey := range config.Authentication { if authKey == "none" { - return nil, errors.New("cannot use none as an auth key") + return nil, nil, errors.New("cannot use none as an auth key") } } @@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) { if config.Redis.URL != "" { rURL, err := ReadFromEnvOrConfig(config.Redis.URL) if err != nil { - return nil, err + return nil, nil, err } redisClient, err = NewRedisClient(rURL) if err != nil { - return nil, err + return nil, nil, err } } if redisClient == nil && config.RateLimit.UseRedis { - return nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") + return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") } var lim BackendRateLimiter @@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) { if config.SenderRateLimit.Enabled { if config.SenderRateLimit.Limit <= 0 { - return nil, errors.New("limit in sender_rate_limit must be > 0") + return nil, nil, errors.New("limit in sender_rate_limit must be > 0") } if time.Duration(config.SenderRateLimit.Interval) < time.Second { - return nil, errors.New("interval in sender_rate_limit must be >= 1s") + return nil, nil, errors.New("interval in sender_rate_limit must be >= 1s") } } @@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) { rpcURL, err := ReadFromEnvOrConfig(cfg.RPCURL) if err != nil { - return nil, err + return nil, nil, err } wsURL, err := ReadFromEnvOrConfig(cfg.WSURL) if err != nil { - return nil, err + return nil, nil, err } if rpcURL == "" { - return nil, fmt.Errorf("must define an RPC URL for backend %s", name) - } - if wsURL == "" { - return nil, fmt.Errorf("must define a WS URL for backend %s", name) + return nil, nil, fmt.Errorf("must define an RPC URL for backend %s", name) } if config.BackendOptions.ResponseTimeoutSeconds != 0 { @@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) { if cfg.Password != "" { passwordVal, err := ReadFromEnvOrConfig(cfg.Password) if err != nil { - return nil, err + return nil, nil, err } opts = append(opts, WithBasicAuth(cfg.Username, passwordVal)) } tlsConfig, err := configureBackendTLS(cfg) if err != nil { - return nil, err + return nil, nil, err } if tlsConfig != nil { log.Info("using custom TLS config for backend", "name", name) @@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) { backends := make([]*Backend, 0) for _, bName := range bg.Backends { if backendsByName[bName] == nil { - return nil, fmt.Errorf("backend %s is not defined", bName) + return nil, nil, fmt.Errorf("backend %s is not defined", bName) } backends = append(backends, backendsByName[bName]) } @@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) { if config.WSBackendGroup != "" { wsBackendGroup = backendGroups[config.WSBackendGroup] if wsBackendGroup == nil { - return nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) + return nil, nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) } } if wsBackendGroup == nil && config.Server.WSPort != 0 { - return nil, fmt.Errorf("a ws port was defined, but no ws group was defined") + return nil, nil, fmt.Errorf("a ws port was defined, but no ws group was defined") } for _, bg := range config.RPCMethodMappings { if backendGroups[bg] == nil { - return nil, fmt.Errorf("undefined backend group %s", bg) + return nil, nil, fmt.Errorf("undefined backend group %s", bg) } } @@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) { for secret, alias := range config.Authentication { resolvedSecret, err := ReadFromEnvOrConfig(secret) if err != nil { - return nil, err + return nil, nil, err } resolvedAuth[resolvedSecret] = alias } @@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) { ) if config.Cache.BlockSyncRPCURL == "" { - return nil, fmt.Errorf("block sync node required for caching") + return nil, nil, fmt.Errorf("block sync node required for caching") } blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL) if err != nil { - return nil, err + return nil, nil, err } if redisClient == nil { @@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) { // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind ethClient, err := ethclient.Dial(blockSyncRPCURL) if err != nil { - return nil, err + return nil, nil, err } defer ethClient.Close() @@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) { redisClient, ) if err != nil { - return nil, fmt.Errorf("error creating server: %w", err) + return nil, nil, fmt.Errorf("error creating server: %w", err) } if config.Metrics.Enabled { @@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) { log.Crit("error starting WS server", "err", err) } }() + } else { + log.Info("WS server not enabled (ws_port is set to 0)") + } + + for bgName, bg := range backendGroups { + if config.BackendGroups[bgName].ConsensusAware { + log.Info("creating poller for consensus aware backend_group", "name", bgName) + + copts := make([]ConsensusOpt, 0) + + if config.BackendGroups[bgName].ConsensusAsyncHandler == "noop" { + copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler())) + } + cp := NewConsensusPoller(bg, copts...) + bg.Consensus = cp + } } <-errTimer.C log.Info("started proxyd") - return func() { + shutdownFunc := func() { log.Info("shutting down proxyd") if blockNumLVC != nil { blockNumLVC.Stop() @@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) { log.Error("error flushing backend ws conns", "err", err) } log.Info("goodbye") - }, nil + } + + return srv, shutdownFunc, nil } func secondsToDuration(seconds int) time.Duration { diff --git a/proxyd/proxyd/server.go b/proxyd/proxyd/server.go index 5df9d37..fac04f0 100644 --- a/proxyd/proxyd/server.go +++ b/proxyd/proxyd/server.go @@ -39,7 +39,7 @@ const ( var emptyArrayResponse = json.RawMessage("[]") type Server struct { - backendGroups map[string]*BackendGroup + BackendGroups map[string]*BackendGroup wsBackendGroup *BackendGroup wsMethodWhitelist *StringSet rpcMethodMappings map[string]string @@ -152,7 +152,7 @@ func NewServer( } return &Server{ - backendGroups: backendGroups, + BackendGroups: backendGroups, wsBackendGroup: wsBackendGroup, wsMethodWhitelist: wsMethodWhitelist, rpcMethodMappings: rpcMethodMappings, @@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL start := i * s.maxUpstreamBatchSize end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) elems := cacheMisses[start:end] - res, err := s.backendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) + res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) if err != nil { log.Error( "error forwarding RPC batch", @@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context } ctx := context.WithValue(r.Context(), ContextKeyXForwardedFor, xff) // nolint:staticcheck - if s.authenticatedPaths == nil { + if len(s.authenticatedPaths) == 0 { // handle the edge case where auth is disabled // but someone sends in an auth key anyway if authorization != "" { diff --git a/proxyd/proxyd/tools/mockserver/handler/handler.go b/proxyd/proxyd/tools/mockserver/handler/handler.go new file mode 100644 index 0000000..18d6026 --- /dev/null +++ b/proxyd/proxyd/tools/mockserver/handler/handler.go @@ -0,0 +1,102 @@ +package handler + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + + "github.com/gorilla/mux" + "github.com/pkg/errors" + "gopkg.in/yaml.v3" +) + +type MethodTemplate struct { + Method string `yaml:"method"` + Block string `yaml:"block"` + Response string `yaml:"response"` +} + +type MockedHandler struct { + Overrides []*MethodTemplate + Autoload bool + AutoloadFile string +} + +func (mh *MockedHandler) Serve(port int) error { + r := mux.NewRouter() + r.HandleFunc("/", mh.Handler) + http.Handle("/", r) + fmt.Printf("starting server up on :%d serving MockedResponsesFile %s\n", port, mh.AutoloadFile) + err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil) + + if errors.Is(err, http.ErrServerClosed) { + fmt.Printf("server closed\n") + } else if err != nil { + fmt.Printf("error starting server: %s\n", err) + return err + } + return nil +} + +func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) { + body, err := io.ReadAll(req.Body) + if err != nil { + fmt.Printf("error reading request: %v\n", err) + } + + var j map[string]interface{} + err = json.Unmarshal(body, &j) + if err != nil { + fmt.Printf("error reading request: %v\n", err) + } + + var template []*MethodTemplate + if mh.Autoload { + template = append(template, mh.LoadFromFile(mh.AutoloadFile)...) + } + if mh.Overrides != nil { + template = append(template, mh.Overrides...) + } + + method := j["method"] + block := "" + if method == "eth_getBlockByNumber" { + block = (j["params"].([]interface{})[0]).(string) + } + + var selectedResponse *string + for _, r := range template { + if r.Method == method && r.Block == block { + selectedResponse = &r.Response + } + } + if selectedResponse != nil { + _, err := fmt.Fprintf(w, *selectedResponse) + if err != nil { + fmt.Printf("error writing response: %v\n", err) + } + } +} + +func (mh *MockedHandler) LoadFromFile(file string) []*MethodTemplate { + contents, err := os.ReadFile(file) + if err != nil { + fmt.Printf("error reading MockedResponsesFile: %v\n", err) + } + var template []*MethodTemplate + err = yaml.Unmarshal(contents, &template) + if err != nil { + fmt.Printf("error reading MockedResponsesFile: %v\n", err) + } + return template +} + +func (mh *MockedHandler) AddOverride(template *MethodTemplate) { + mh.Overrides = append(mh.Overrides, template) +} + +func (mh *MockedHandler) ResetOverrides() { + mh.Overrides = make([]*MethodTemplate, 0) +} diff --git a/proxyd/proxyd/tools/mockserver/main.go b/proxyd/proxyd/tools/mockserver/main.go new file mode 100644 index 0000000..a58fc06 --- /dev/null +++ b/proxyd/proxyd/tools/mockserver/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "os" + "path" + "strconv" + + "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler" +) + +func main() { + if len(os.Args) < 3 { + fmt.Printf("simply mock a response based on an external text MockedResponsesFile\n") + fmt.Printf("usage: mockserver \n") + os.Exit(1) + } + port, _ := strconv.ParseInt(os.Args[1], 10, 32) + dir, _ := os.Getwd() + + h := handler.MockedHandler{ + Autoload: true, + AutoloadFile: path.Join(dir, os.Args[2]), + } + + err := h.Serve(int(port)) + if err != nil { + fmt.Printf("error starting mockserver: %v\n", err) + } +} diff --git a/proxyd/proxyd/tools/mockserver/node1.yml b/proxyd/proxyd/tools/mockserver/node1.yml new file mode 100644 index 0000000..c14f328 --- /dev/null +++ b/proxyd/proxyd/tools/mockserver/node1.yml @@ -0,0 +1,44 @@ +- method: eth_getBlockByNumber + block: latest + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash2", + "number": "0x2" + } + } +- method: eth_getBlockByNumber + block: 0x1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash1", + "number": "0x1" + } + } +- method: eth_getBlockByNumber + block: 0x2 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash2", + "number": "0x2" + } + } +- method: eth_getBlockByNumber + block: 0x3 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash34", + "number": "0x3" + } + } \ No newline at end of file diff --git a/proxyd/proxyd/tools/mockserver/node2.yml b/proxyd/proxyd/tools/mockserver/node2.yml new file mode 100644 index 0000000..b94ee7a --- /dev/null +++ b/proxyd/proxyd/tools/mockserver/node2.yml @@ -0,0 +1,44 @@ +- method: eth_getBlockByNumber + block: latest + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash2", + "number": "0x2" + } + } +- method: eth_getBlockByNumber + block: 0x1 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash1", + "number": "0x1" + } + } +- method: eth_getBlockByNumber + block: 0x2 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash2", + "number": "0x2" + } + } +- method: eth_getBlockByNumber + block: 0x3 + response: > + { + "jsonrpc": "2.0", + "id": 67, + "result": { + "hash": "hash3", + "number": "0x3" + } + } \ No newline at end of file