proxyd: add consensus poller

This commit is contained in:
Felipe Andrade 2023-04-18 11:57:55 -07:00
parent 5647b8faf2
commit 05536fa181
27 changed files with 1209 additions and 57 deletions

@ -365,6 +365,36 @@ func (b *Backend) setOffline() {
} }
} }
// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
if err != nil {
return err
}
rpcReq := RPCReq{
JSONRPC: JSONRPCVersion,
Method: method,
Params: jsonParams,
ID: []byte(id),
}
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
if err != nil {
return err
}
if len(slicedRes) != 1 {
return fmt.Errorf("unexpected response len for non-batched request (len != 1)")
}
if slicedRes[0].IsError() {
return fmt.Errorf(slicedRes[0].Error.Error())
}
*res = *(slicedRes[0])
return nil
}
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
@ -486,6 +516,7 @@ func sortBatchRPCResponse(req []*RPCReq, res []*RPCRes) {
type BackendGroup struct { type BackendGroup struct {
Name string Name string
Backends []*Backend Backends []*Backend
Consensus *ConsensusPoller
} }
func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {

@ -52,7 +52,7 @@ func main() {
), ),
) )
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
if err != nil { if err != nil {
log.Crit("error starting proxyd", "err", err) log.Crit("error starting proxyd", "err", err)
} }

@ -82,6 +82,7 @@ type BackendConfig struct {
Password string `toml:"password"` Password string `toml:"password"`
RPCURL string `toml:"rpc_url"` RPCURL string `toml:"rpc_url"`
WSURL string `toml:"ws_url"` WSURL string `toml:"ws_url"`
WSPort int `toml:"ws_port"`
MaxRPS int `toml:"max_rps"` MaxRPS int `toml:"max_rps"`
MaxWSConns int `toml:"max_ws_conns"` MaxWSConns int `toml:"max_ws_conns"`
CAFile string `toml:"ca_file"` CAFile string `toml:"ca_file"`
@ -94,6 +95,8 @@ type BackendsConfig map[string]*BackendConfig
type BackendGroupConfig struct { type BackendGroupConfig struct {
Backends []string `toml:"backends"` Backends []string `toml:"backends"`
ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"`
} }
type BackendGroupsConfig map[string]*BackendGroupConfig type BackendGroupsConfig map[string]*BackendGroupConfig

@ -0,0 +1,345 @@
package proxyd
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
const (
PollerInterval = 1 * time.Second
)
// ConsensusPoller checks the consensus state for each member of a BackendGroup
// resolves the highest common block for multiple nodes, and reconciles the consensus
// in case of block hash divergence to minimize re-orgs
type ConsensusPoller struct {
cancelFunc context.CancelFunc
backendGroup *BackendGroup
backendState map[*Backend]*backendState
consensusGroupMux sync.Mutex
consensusGroup []*Backend
tracker ConsensusTracker
asyncHandler ConsensusAsyncHandler
}
type backendState struct {
backendStateMux sync.Mutex
latestBlockNumber string
latestBlockHash string
lastUpdate time.Time
bannedUntil time.Time
}
// GetConsensusGroup returns the backend members that are agreeing in a consensus
func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
defer cp.consensusGroupMux.Unlock()
cp.consensusGroupMux.Lock()
g := make([]*Backend, len(cp.backendGroup.Backends))
copy(g, cp.consensusGroup)
return g
}
// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() string {
return ct.tracker.GetConsensusBlockNumber()
}
func (cp *ConsensusPoller) Shutdown() {
cp.asyncHandler.Shutdown()
}
// ConsensusAsyncHandler controls the asynchronous polling mechanism, interval and shutdown
type ConsensusAsyncHandler interface {
Init()
Shutdown()
}
// NoopAsyncHandler allows fine control updating the consensus
type NoopAsyncHandler struct{}
func NewNoopAsyncHandler() ConsensusAsyncHandler {
log.Warn("using NewNoopAsyncHandler")
return &NoopAsyncHandler{}
}
func (ah *NoopAsyncHandler) Init() {}
func (ah *NoopAsyncHandler) Shutdown() {}
// PollerAsyncHandler asynchronously updates each individual backend and the group consensus
type PollerAsyncHandler struct {
ctx context.Context
cp *ConsensusPoller
}
func NewPollerAsyncHandler(ctx context.Context, cp *ConsensusPoller) ConsensusAsyncHandler {
return &PollerAsyncHandler{
ctx: ctx,
cp: cp,
}
}
func (ah *PollerAsyncHandler) Init() {
// create the individual backend pollers
for _, be := range ah.cp.backendGroup.Backends {
go func(be *Backend) {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackend(ah.ctx, be)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}(be)
}
// create the group consensus poller
go func() {
for {
timer := time.NewTimer(PollerInterval)
ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select {
case <-timer.C:
case <-ah.ctx.Done():
timer.Stop()
return
}
}
}()
}
func (ah *PollerAsyncHandler) Shutdown() {
ah.cp.cancelFunc()
}
type ConsensusOpt func(cp *ConsensusPoller)
func WithTracker(tracker ConsensusTracker) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.tracker = tracker
}
}
func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.asyncHandler = asyncHandler
}
}
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background())
state := make(map[*Backend]*backendState, len(bg.Backends))
for _, be := range bg.Backends {
state[be] = &backendState{}
}
cp := &ConsensusPoller{
cancelFunc: cancelFunc,
backendGroup: bg,
backendState: state,
}
for _, opt := range opts {
opt(cp)
}
if cp.tracker == nil {
cp.tracker = NewInMemoryConsensusTracker()
}
if cp.asyncHandler == nil {
cp.asyncHandler = NewPollerAsyncHandler(ctx, cp)
}
cp.asyncHandler.Init()
return cp
}
// UpdateBackend refreshes the consensus state of a single backend
func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
bs := cp.backendState[be]
if time.Now().Before(bs.bannedUntil) {
log.Warn("skipping backend banned", "backend", be.Name, "bannedUntil", bs.bannedUntil)
return
}
if be.IsRateLimited() || !be.Online() {
return
}
// we'll introduce here checks to ban the backend
// i.e. node is syncing the chain
// then update backend consensus
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
return
}
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash)
if changed {
backendLatestBlockBackend.WithLabelValues(be.Name).Set(blockToFloat(latestBlockNumber))
log.Info("backend state updated", "name", be.Name, "state", bs)
}
}
// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var lowestBlock string
var lowestBlockHash string
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
for _, be := range cp.backendGroup.Backends {
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be)
if lowestBlock == "" || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
}
}
// no block to propose (i.e. initializing consensus)
if lowestBlock == "" {
return
}
proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
hasConsensus := false
// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
if lowestBlock > currentConsensusBlockNumber {
log.Info("validating consensus on block", lowestBlock)
}
broken := false
for !hasConsensus {
allAgreed := true
consensusBackends = consensusBackends[:0]
filteredBackendsNames = filteredBackendsNames[:0]
for _, be := range cp.backendGroup.Backends {
if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) {
filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue
}
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock)
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
}
if proposedBlockHash == "" {
proposedBlockHash = actualBlockHash
}
blocksDontMatch := (actualBlockNumber != proposedBlock) || (actualBlockHash != proposedBlockHash)
if blocksDontMatch {
if blockAheadOrEqual(currentConsensusBlockNumber, actualBlockNumber) {
log.Warn("backend broke consensus", "name", be.Name, "blockNum", actualBlockNumber, "proposedBlockNum", proposedBlock, "blockHash", actualBlockHash, "proposedBlockHash", proposedBlockHash)
broken = true
}
allAgreed = false
break
}
consensusBackends = append(consensusBackends, be)
consensusBackendsNames = append(consensusBackendsNames, be.Name)
}
if allAgreed {
hasConsensus = true
} else {
// walk one block behind and try again
proposedBlock = hexAdd(proposedBlock, -1)
proposedBlockHash = ""
log.Info("no consensus, now trying", "block:", proposedBlock)
}
}
if broken {
// propagate event to other interested parts, such as cache invalidator
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
}
cp.tracker.SetConsensusBlockNumber(proposedBlock)
consensusLatestBlock.Set(blockToFloat(proposedBlock))
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock()
log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
}
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber string, blockHash string, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return "", "", err
}
jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return "", "", fmt.Errorf(fmt.Sprintf("unexpected response type checking consensus on backend %s", be.Name))
}
blockNumber = jsonMap["number"].(string)
blockHash = jsonMap["hash"].(string)
return
}
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber string, blockHash string) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
bs.backendStateMux.Unlock()
return
}
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber string, blockHash string) (changed bool) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
return
}
// hexAdd Convenient way to convert hex block to uint64, increment, and convert back to hex
func hexAdd(hexVal string, incr int64) string {
return hexutil.EncodeUint64(uint64(int64(hexutil.MustDecodeUint64(hexVal)) + incr))
}
// blockAheadOrEqual Convenient way to check if `baseBlock` is ahead or equal than `checkBlock`
func blockAheadOrEqual(baseBlock string, checkBlock string) bool {
return hexutil.MustDecodeUint64(baseBlock) >= hexutil.MustDecodeUint64(checkBlock)
}
// blockToFloat Convenient way to convert a hex block to float64
func blockToFloat(hexVal string) float64 {
return float64(hexutil.MustDecodeUint64(hexVal))
}

@ -0,0 +1,74 @@
package proxyd
import (
"testing"
)
func Test_blockToFloat(t *testing.T) {
type args struct {
hexVal string
}
tests := []struct {
name string
args args
want float64
}{
{"0xf1b3", args{"0xf1b3"}, float64(61875)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := blockToFloat(tt.args.hexVal); got != tt.want {
t.Errorf("blockToFloat() = %v, want %v", got, tt.want)
}
})
}
}
func Test_hexAdd(t *testing.T) {
type args struct {
hexVal string
incr int64
}
tests := []struct {
name string
args args
want string
}{
{"0x1", args{"0x1", 1}, "0x2"},
{"0x2", args{"0x2", -1}, "0x1"},
{"0xf", args{"0xf", 1}, "0x10"},
{"0x10", args{"0x10", -1}, "0xf"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := hexAdd(tt.args.hexVal, tt.args.incr); got != tt.want {
t.Errorf("hexAdd() = %v, want %v", got, tt.want)
}
})
}
}
func Test_blockAheadOrEqual(t *testing.T) {
type args struct {
baseBlock string
checkBlock string
}
tests := []struct {
name string
args args
want bool
}{
{"0x1 vs 0x1", args{"0x1", "0x1"}, true},
{"0x2 vs 0x1", args{"0x2", "0x1"}, true},
{"0x1 vs 0x2", args{"0x1", "0x2"}, false},
{"0xff vs 0x100", args{"0xff", "0x100"}, false},
{"0x100 vs 0xff", args{"0x100", "0xff"}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := blockAheadOrEqual(tt.args.baseBlock, tt.args.checkBlock); got != tt.want {
t.Errorf("blockAheadOrEqual() = %v, want %v", got, tt.want)
}
})
}
}

@ -0,0 +1,70 @@
package proxyd
import (
"context"
"fmt"
"sync"
"github.com/go-redis/redis/v8"
)
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetConsensusBlockNumber() string
SetConsensusBlockNumber(blockNumber string)
}
// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
consensusBlockNumber string
mutex sync.Mutex
}
func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
consensusBlockNumber: "", // empty string semantics means unknown
mutex: sync.Mutex{},
}
}
func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() string {
defer ct.mutex.Unlock()
ct.mutex.Lock()
return ct.consensusBlockNumber
}
func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber string) {
defer ct.mutex.Unlock()
ct.mutex.Lock()
ct.consensusBlockNumber = blockNumber
}
// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
type RedisConsensusTracker struct {
ctx context.Context
client *redis.Client
backendGroup string
}
func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace string) ConsensusTracker {
return &RedisConsensusTracker{
ctx: ctx,
client: r,
backendGroup: namespace,
}
}
func (ct *RedisConsensusTracker) key() string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup)
}
func (ct *RedisConsensusTracker) GetConsensusBlockNumber() string {
return ct.client.Get(ct.ctx, ct.key()).Val()
}
func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber string) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0)
}

@ -15,6 +15,7 @@ rpc_port = 8080
# Host for the proxyd WS server to listen on. # Host for the proxyd WS server to listen on.
ws_host = "0.0.0.0" ws_host = "0.0.0.0"
# Port for the above # Port for the above
# Set the ws_port to 0 to disable WS
ws_port = 8085 ws_port = 8085
# Maximum client body size, in bytes, that the server will accept. # Maximum client body size, in bytes, that the server will accept.
max_body_size_bytes = 10485760 max_body_size_bytes = 10485760

@ -11,10 +11,12 @@ require (
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1 github.com/prometheus/client_golang v1.11.1
github.com/rs/cors v1.8.2 github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/yaml.v2 v2.4.0
) )
require ( require (

@ -22,7 +22,7 @@ func TestBatchTimeout(t *testing.T) {
config := ReadConfig("batch_timeout") config := ReadConfig("batch_timeout")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -148,7 +148,7 @@ func TestBatching(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -35,7 +35,7 @@ func TestCaching(t *testing.T) {
require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://127.0.0.1:%s", redis.Port()))) require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://127.0.0.1:%s", redis.Port())))
config := ReadConfig("caching") config := ReadConfig("caching")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -171,7 +171,7 @@ func TestBatchCaching(t *testing.T) {
config := ReadConfig("caching") config := ReadConfig("caching")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -0,0 +1,309 @@
package integration_tests
import (
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"github.com/ethereum-optimism/optimism/proxyd"
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
"github.com/stretchr/testify/require"
)
func TestConsensus(t *testing.T) {
node1 := NewMockBackend(nil)
defer node1.Close()
node2 := NewMockBackend(nil)
defer node2.Close()
dir, err := os.Getwd()
require.NoError(t, err)
responses := path.Join(dir, "testdata/consensus_responses.yml")
h1 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
h2 := ms.MockedHandler{
Overrides: []*ms.MethodTemplate{},
Autoload: true,
AutoloadFile: responses,
}
require.NoError(t, os.Setenv("NODE1_URL", node1.URL()))
require.NoError(t, os.Setenv("NODE2_URL", node2.URL()))
node1.SetHandler(http.HandlerFunc(h1.Handler))
node2.SetHandler(http.HandlerFunc(h2.Handler))
config := ReadConfig("consensus")
ctx := context.Background()
svr, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
bg := svr.BackendGroups["node"]
require.NotNil(t, bg)
require.NotNil(t, bg.Consensus)
t.Run("initial consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
// unknown consensus at init
require.Equal(t, "", bg.Consensus.GetConsensusBlockNumber())
// first poll
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
})
t.Run("advance consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// advance latest on node2 to 0x2
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// consensus should stick to 0x1, since node1 is still lagging there
bg.Consensus.UpdateBackendGroupConsensus(ctx)
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// advance latest on node1 to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should stick to 0x2, since now all nodes are at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
})
t.Run("broken consensus", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
// make node2 diverge on hash
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "wrong_hash"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, since 0x2 is out of consensus at the moment
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// later, when impl events, listen to broken consensus event
})
t.Run("broken consensus with depth 2", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// advance latest on both nodes to 0x2
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x2", "hash2"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x2
require.Equal(t, "0x2", bg.Consensus.GetConsensusBlockNumber())
// advance latest on both nodes to 0x3
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "hash3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "hash3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// at 0x3
require.Equal(t, "0x3", bg.Consensus.GetConsensusBlockNumber())
// make node2 diverge on hash for blocks 0x2 and 0x3
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "wrong_hash2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "wrong_hash3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
})
t.Run("fork in advanced block", func(t *testing.T) {
h1.ResetOverrides()
h2.ResetOverrides()
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// all nodes start at block 0x1
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
// make nodes 1 and 2 advance in forks
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "node1_0x2"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x2",
Response: buildResponse("0x2", "node2_0x2"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "0x3",
Response: buildResponse("0x3", "node2_0x3"),
})
h1.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "node1_0x3"),
})
h2.AddOverride(&ms.MethodTemplate{
Method: "eth_getBlockByNumber",
Block: "latest",
Response: buildResponse("0x3", "node2_0x3"),
})
// poll for group consensus
for _, be := range bg.Backends {
bg.Consensus.UpdateBackend(ctx, be)
}
bg.Consensus.UpdateBackendGroupConsensus(ctx)
// should resolve to 0x1, the highest common ancestor
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber())
})
}
func buildResponse(number string, hash string) string {
return fmt.Sprintf(`{
"jsonrpc": "2.0",
"id": 67,
"result": {
"number": "%s",
"hash": "%s"
}
}`, number, hash)
}

@ -30,7 +30,7 @@ func TestFailover(t *testing.T) {
config := ReadConfig("failover") config := ReadConfig("failover")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -128,7 +128,7 @@ func TestRetries(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("retries") config := ReadConfig("retries")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -171,7 +171,7 @@ func TestOutOfServiceInterval(t *testing.T) {
config := ReadConfig("out_of_service_interval") config := ReadConfig("out_of_service_interval")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -226,7 +226,7 @@ func TestBatchWithPartialFailover(t *testing.T) {
require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -273,7 +273,7 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) {
require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL())) require.NoError(t, os.Setenv("BAD_BACKEND_RPC_URL", badBackend.URL()))
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -41,7 +41,7 @@ func TestMaxConcurrentRPCs(t *testing.T) {
config := ReadConfig("max_rpc_conns") config := ReadConfig("max_rpc_conns")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -29,7 +29,7 @@ func TestBackendMaxRPSLimit(t *testing.T) {
config := ReadConfig("backend_rate_limit") config := ReadConfig("backend_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3) limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3)
@ -45,7 +45,7 @@ func TestFrontendMaxRPSLimit(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
config := ReadConfig("frontend_rate_limit") config := ReadConfig("frontend_rate_limit")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -43,7 +43,7 @@ func TestSenderRateLimitValidation(t *testing.T) {
// validation. // validation.
config.SenderRateLimit.Limit = math.MaxInt config.SenderRateLimit.Limit = math.MaxInt
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -73,7 +73,7 @@ func TestSenderRateLimitLimiting(t *testing.T) {
config := ReadConfig("sender_rate_limit") config := ReadConfig("sender_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -0,0 +1,24 @@
[server]
rpc_port = 8080
[backend]
response_timeout_seconds = 1
[backends]
[backends.node1]
rpc_url = "$NODE1_URL"
[backends.node2]
rpc_url = "$NODE2_URL"
[backend_groups]
[backend_groups.node]
backends = ["node1", "node2"]
consensus_aware = true
consensus_handler = "noop" # allow more control over the consensus poller for tests
[rpc_method_mappings]
eth_call = "node"
eth_chainId = "node"
eth_blockNumber = "node"
eth_getBlockByNumber = "node"

@ -0,0 +1,44 @@
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}

@ -26,7 +26,7 @@ func TestSingleRPCValidation(t *testing.T) {
config := ReadConfig("whitelist") config := ReadConfig("whitelist")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -110,7 +110,7 @@ func TestBatchRPCValidation(t *testing.T) {
config := ReadConfig("whitelist") config := ReadConfig("whitelist")
client := NewProxydClient("http://127.0.0.1:8545") client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -38,7 +38,7 @@ func TestConcurrentWSPanic(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
client, err := NewProxydWSClient("ws://127.0.0.1:8546", nil, nil) client, err := NewProxydWSClient("ws://127.0.0.1:8546", nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -147,7 +147,7 @@ func TestWS(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) { client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data) clientHdlr.MsgCB(msgType, data)
@ -238,7 +238,7 @@ func TestWSClientClosure(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
@ -278,7 +278,7 @@ func TestWSClientMaxConns(t *testing.T) {
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws") config := ReadConfig("ws")
shutdown, err := proxyd.Start(config) _, shutdown, err := proxyd.Start(config)
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()

@ -242,6 +242,20 @@ var (
Name: "rate_limit_take_errors", Name: "rate_limit_take_errors",
Help: "Count of errors taking frontend rate limits", Help: "Count of errors taking frontend rate limits",
}) })
consensusLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "consensus_latest_block",
Help: "Consensus latest block",
})
backendLatestBlockBackend = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "backend_latest_block",
Help: "Current latest block observed per backend",
}, []string{
"backend_name",
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {

@ -18,20 +18,20 @@ import (
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
func Start(config *Config) (func(), error) { func Start(config *Config) (*Server, func(), error) {
if len(config.Backends) == 0 { if len(config.Backends) == 0 {
return nil, errors.New("must define at least one backend") return nil, nil, errors.New("must define at least one backend")
} }
if len(config.BackendGroups) == 0 { if len(config.BackendGroups) == 0 {
return nil, errors.New("must define at least one backend group") return nil, nil, errors.New("must define at least one backend group")
} }
if len(config.RPCMethodMappings) == 0 { if len(config.RPCMethodMappings) == 0 {
return nil, errors.New("must define at least one RPC method mapping") return nil, nil, errors.New("must define at least one RPC method mapping")
} }
for authKey := range config.Authentication { for authKey := range config.Authentication {
if authKey == "none" { if authKey == "none" {
return nil, errors.New("cannot use none as an auth key") return nil, nil, errors.New("cannot use none as an auth key")
} }
} }
@ -39,16 +39,16 @@ func Start(config *Config) (func(), error) {
if config.Redis.URL != "" { if config.Redis.URL != "" {
rURL, err := ReadFromEnvOrConfig(config.Redis.URL) rURL, err := ReadFromEnvOrConfig(config.Redis.URL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
redisClient, err = NewRedisClient(rURL) redisClient, err = NewRedisClient(rURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
} }
if redisClient == nil && config.RateLimit.UseRedis { if redisClient == nil && config.RateLimit.UseRedis {
return nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config")
} }
var lim BackendRateLimiter var lim BackendRateLimiter
@ -80,10 +80,10 @@ func Start(config *Config) (func(), error) {
if config.SenderRateLimit.Enabled { if config.SenderRateLimit.Enabled {
if config.SenderRateLimit.Limit <= 0 { if config.SenderRateLimit.Limit <= 0 {
return nil, errors.New("limit in sender_rate_limit must be > 0") return nil, nil, errors.New("limit in sender_rate_limit must be > 0")
} }
if time.Duration(config.SenderRateLimit.Interval) < time.Second { if time.Duration(config.SenderRateLimit.Interval) < time.Second {
return nil, errors.New("interval in sender_rate_limit must be >= 1s") return nil, nil, errors.New("interval in sender_rate_limit must be >= 1s")
} }
} }
@ -100,17 +100,14 @@ func Start(config *Config) (func(), error) {
rpcURL, err := ReadFromEnvOrConfig(cfg.RPCURL) rpcURL, err := ReadFromEnvOrConfig(cfg.RPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
wsURL, err := ReadFromEnvOrConfig(cfg.WSURL) wsURL, err := ReadFromEnvOrConfig(cfg.WSURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if rpcURL == "" { if rpcURL == "" {
return nil, fmt.Errorf("must define an RPC URL for backend %s", name) return nil, nil, fmt.Errorf("must define an RPC URL for backend %s", name)
}
if wsURL == "" {
return nil, fmt.Errorf("must define a WS URL for backend %s", name)
} }
if config.BackendOptions.ResponseTimeoutSeconds != 0 { if config.BackendOptions.ResponseTimeoutSeconds != 0 {
@ -135,13 +132,13 @@ func Start(config *Config) (func(), error) {
if cfg.Password != "" { if cfg.Password != "" {
passwordVal, err := ReadFromEnvOrConfig(cfg.Password) passwordVal, err := ReadFromEnvOrConfig(cfg.Password)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
opts = append(opts, WithBasicAuth(cfg.Username, passwordVal)) opts = append(opts, WithBasicAuth(cfg.Username, passwordVal))
} }
tlsConfig, err := configureBackendTLS(cfg) tlsConfig, err := configureBackendTLS(cfg)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if tlsConfig != nil { if tlsConfig != nil {
log.Info("using custom TLS config for backend", "name", name) log.Info("using custom TLS config for backend", "name", name)
@ -162,7 +159,7 @@ func Start(config *Config) (func(), error) {
backends := make([]*Backend, 0) backends := make([]*Backend, 0)
for _, bName := range bg.Backends { for _, bName := range bg.Backends {
if backendsByName[bName] == nil { if backendsByName[bName] == nil {
return nil, fmt.Errorf("backend %s is not defined", bName) return nil, nil, fmt.Errorf("backend %s is not defined", bName)
} }
backends = append(backends, backendsByName[bName]) backends = append(backends, backendsByName[bName])
} }
@ -177,17 +174,17 @@ func Start(config *Config) (func(), error) {
if config.WSBackendGroup != "" { if config.WSBackendGroup != "" {
wsBackendGroup = backendGroups[config.WSBackendGroup] wsBackendGroup = backendGroups[config.WSBackendGroup]
if wsBackendGroup == nil { if wsBackendGroup == nil {
return nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup) return nil, nil, fmt.Errorf("ws backend group %s does not exist", config.WSBackendGroup)
} }
} }
if wsBackendGroup == nil && config.Server.WSPort != 0 { if wsBackendGroup == nil && config.Server.WSPort != 0 {
return nil, fmt.Errorf("a ws port was defined, but no ws group was defined") return nil, nil, fmt.Errorf("a ws port was defined, but no ws group was defined")
} }
for _, bg := range config.RPCMethodMappings { for _, bg := range config.RPCMethodMappings {
if backendGroups[bg] == nil { if backendGroups[bg] == nil {
return nil, fmt.Errorf("undefined backend group %s", bg) return nil, nil, fmt.Errorf("undefined backend group %s", bg)
} }
} }
@ -198,7 +195,7 @@ func Start(config *Config) (func(), error) {
for secret, alias := range config.Authentication { for secret, alias := range config.Authentication {
resolvedSecret, err := ReadFromEnvOrConfig(secret) resolvedSecret, err := ReadFromEnvOrConfig(secret)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
resolvedAuth[resolvedSecret] = alias resolvedAuth[resolvedSecret] = alias
} }
@ -217,11 +214,11 @@ func Start(config *Config) (func(), error) {
) )
if config.Cache.BlockSyncRPCURL == "" { if config.Cache.BlockSyncRPCURL == "" {
return nil, fmt.Errorf("block sync node required for caching") return nil, nil, fmt.Errorf("block sync node required for caching")
} }
blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL) blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if redisClient == nil { if redisClient == nil {
@ -233,7 +230,7 @@ func Start(config *Config) (func(), error) {
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind // Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL) ethClient, err := ethclient.Dial(blockSyncRPCURL)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer ethClient.Close() defer ethClient.Close()
@ -260,7 +257,7 @@ func Start(config *Config) (func(), error) {
redisClient, redisClient,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating server: %w", err) return nil, nil, fmt.Errorf("error creating server: %w", err)
} }
if config.Metrics.Enabled { if config.Metrics.Enabled {
@ -300,12 +297,28 @@ func Start(config *Config) (func(), error) {
log.Crit("error starting WS server", "err", err) log.Crit("error starting WS server", "err", err)
} }
}() }()
} else {
log.Info("WS server not enabled (ws_port is set to 0)")
}
for bgName, bg := range backendGroups {
if config.BackendGroups[bgName].ConsensusAware {
log.Info("creating poller for consensus aware backend_group", "name", bgName)
copts := make([]ConsensusOpt, 0)
if config.BackendGroups[bgName].ConsensusAsyncHandler == "noop" {
copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler()))
}
cp := NewConsensusPoller(bg, copts...)
bg.Consensus = cp
}
} }
<-errTimer.C <-errTimer.C
log.Info("started proxyd") log.Info("started proxyd")
return func() { shutdownFunc := func() {
log.Info("shutting down proxyd") log.Info("shutting down proxyd")
if blockNumLVC != nil { if blockNumLVC != nil {
blockNumLVC.Stop() blockNumLVC.Stop()
@ -318,7 +331,9 @@ func Start(config *Config) (func(), error) {
log.Error("error flushing backend ws conns", "err", err) log.Error("error flushing backend ws conns", "err", err)
} }
log.Info("goodbye") log.Info("goodbye")
}, nil }
return srv, shutdownFunc, nil
} }
func secondsToDuration(seconds int) time.Duration { func secondsToDuration(seconds int) time.Duration {

@ -39,7 +39,7 @@ const (
var emptyArrayResponse = json.RawMessage("[]") var emptyArrayResponse = json.RawMessage("[]")
type Server struct { type Server struct {
backendGroups map[string]*BackendGroup BackendGroups map[string]*BackendGroup
wsBackendGroup *BackendGroup wsBackendGroup *BackendGroup
wsMethodWhitelist *StringSet wsMethodWhitelist *StringSet
rpcMethodMappings map[string]string rpcMethodMappings map[string]string
@ -152,7 +152,7 @@ func NewServer(
} }
return &Server{ return &Server{
backendGroups: backendGroups, BackendGroups: backendGroups,
wsBackendGroup: wsBackendGroup, wsBackendGroup: wsBackendGroup,
wsMethodWhitelist: wsMethodWhitelist, wsMethodWhitelist: wsMethodWhitelist,
rpcMethodMappings: rpcMethodMappings, rpcMethodMappings: rpcMethodMappings,
@ -476,7 +476,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
start := i * s.maxUpstreamBatchSize start := i * s.maxUpstreamBatchSize
end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses))))
elems := cacheMisses[start:end] elems := cacheMisses[start:end]
res, err := s.backendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
if err != nil { if err != nil {
log.Error( log.Error(
"error forwarding RPC batch", "error forwarding RPC batch",
@ -559,7 +559,7 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context
} }
ctx := context.WithValue(r.Context(), ContextKeyXForwardedFor, xff) // nolint:staticcheck ctx := context.WithValue(r.Context(), ContextKeyXForwardedFor, xff) // nolint:staticcheck
if s.authenticatedPaths == nil { if len(s.authenticatedPaths) == 0 {
// handle the edge case where auth is disabled // handle the edge case where auth is disabled
// but someone sends in an auth key anyway // but someone sends in an auth key anyway
if authorization != "" { if authorization != "" {

@ -0,0 +1,102 @@
package handler
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)
type MethodTemplate struct {
Method string `yaml:"method"`
Block string `yaml:"block"`
Response string `yaml:"response"`
}
type MockedHandler struct {
Overrides []*MethodTemplate
Autoload bool
AutoloadFile string
}
func (mh *MockedHandler) Serve(port int) error {
r := mux.NewRouter()
r.HandleFunc("/", mh.Handler)
http.Handle("/", r)
fmt.Printf("starting server up on :%d serving MockedResponsesFile %s\n", port, mh.AutoloadFile)
err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
if errors.Is(err, http.ErrServerClosed) {
fmt.Printf("server closed\n")
} else if err != nil {
fmt.Printf("error starting server: %s\n", err)
return err
}
return nil
}
func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
var j map[string]interface{}
err = json.Unmarshal(body, &j)
if err != nil {
fmt.Printf("error reading request: %v\n", err)
}
var template []*MethodTemplate
if mh.Autoload {
template = append(template, mh.LoadFromFile(mh.AutoloadFile)...)
}
if mh.Overrides != nil {
template = append(template, mh.Overrides...)
}
method := j["method"]
block := ""
if method == "eth_getBlockByNumber" {
block = (j["params"].([]interface{})[0]).(string)
}
var selectedResponse *string
for _, r := range template {
if r.Method == method && r.Block == block {
selectedResponse = &r.Response
}
}
if selectedResponse != nil {
_, err := fmt.Fprintf(w, *selectedResponse)
if err != nil {
fmt.Printf("error writing response: %v\n", err)
}
}
}
func (mh *MockedHandler) LoadFromFile(file string) []*MethodTemplate {
contents, err := os.ReadFile(file)
if err != nil {
fmt.Printf("error reading MockedResponsesFile: %v\n", err)
}
var template []*MethodTemplate
err = yaml.Unmarshal(contents, &template)
if err != nil {
fmt.Printf("error reading MockedResponsesFile: %v\n", err)
}
return template
}
func (mh *MockedHandler) AddOverride(template *MethodTemplate) {
mh.Overrides = append(mh.Overrides, template)
}
func (mh *MockedHandler) ResetOverrides() {
mh.Overrides = make([]*MethodTemplate, 0)
}

@ -0,0 +1,30 @@
package main
import (
"fmt"
"os"
"path"
"strconv"
"github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
)
func main() {
if len(os.Args) < 3 {
fmt.Printf("simply mock a response based on an external text MockedResponsesFile\n")
fmt.Printf("usage: mockserver <port> <MockedResponsesFile.yml>\n")
os.Exit(1)
}
port, _ := strconv.ParseInt(os.Args[1], 10, 32)
dir, _ := os.Getwd()
h := handler.MockedHandler{
Autoload: true,
AutoloadFile: path.Join(dir, os.Args[2]),
}
err := h.Serve(int(port))
if err != nil {
fmt.Printf("error starting mockserver: %v\n", err)
}
}

@ -0,0 +1,44 @@
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash34",
"number": "0x3"
}
}

@ -0,0 +1,44 @@
- method: eth_getBlockByNumber
block: latest
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x1
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash1",
"number": "0x1"
}
}
- method: eth_getBlockByNumber
block: 0x2
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash2",
"number": "0x2"
}
}
- method: eth_getBlockByNumber
block: 0x3
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"hash": "hash3",
"number": "0x3"
}
}