proxyd: integrate health checks
This commit is contained in:
parent
ba17174da5
commit
26f7d10e16
@ -17,6 +17,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
sw "github.com/ethereum-optimism/optimism/proxyd/pkg/avg-sliding-window"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -83,6 +85,11 @@ var (
|
||||
Message: "sender is over rate limit",
|
||||
HTTPErrorCode: 429,
|
||||
}
|
||||
ErrNotHealthy = &RPCErr{
|
||||
Code: JSONRPCErrorInternal - 18,
|
||||
Message: "backend is currently not healthy to serve traffic",
|
||||
HTTPErrorCode: 429,
|
||||
}
|
||||
|
||||
ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
|
||||
)
|
||||
@ -119,6 +126,14 @@ type Backend struct {
|
||||
outOfServiceInterval time.Duration
|
||||
stripTrailingXFF bool
|
||||
proxydIP string
|
||||
|
||||
maxDegradedLatencyThreshold time.Duration
|
||||
maxLatencyThreshold time.Duration
|
||||
maxErrorRateThreshold float64
|
||||
|
||||
latencySlidingWindow *sw.AvgSlidingWindow
|
||||
networkRequestsSlidingWindow *sw.AvgSlidingWindow
|
||||
networkErrorsSlidingWindow *sw.AvgSlidingWindow
|
||||
}
|
||||
|
||||
type BackendOpt func(b *Backend)
|
||||
@ -187,6 +202,18 @@ func WithProxydIP(ip string) BackendOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt {
|
||||
return func(b *Backend) {
|
||||
b.maxLatencyThreshold = maxLatencyThreshold
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt {
|
||||
return func(b *Backend) {
|
||||
b.maxErrorRateThreshold = maxErrorRateThreshold
|
||||
}
|
||||
}
|
||||
|
||||
func NewBackend(
|
||||
name string,
|
||||
rpcURL string,
|
||||
@ -207,6 +234,14 @@ func NewBackend(
|
||||
backendName: name,
|
||||
},
|
||||
dialer: &websocket.Dialer{},
|
||||
|
||||
maxLatencyThreshold: 10 * time.Second,
|
||||
maxDegradedLatencyThreshold: 5 * time.Second,
|
||||
maxErrorRateThreshold: 0.5,
|
||||
|
||||
latencySlidingWindow: sw.NewSlidingWindow(),
|
||||
networkRequestsSlidingWindow: sw.NewSlidingWindow(),
|
||||
networkErrorsSlidingWindow: sw.NewSlidingWindow(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -252,11 +287,11 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
|
||||
case nil: // do nothing
|
||||
// ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object
|
||||
// to a batch request whenever any Request Object in the batch would induce a partial error.
|
||||
// We don't label the the backend offline in this case. But the error is still returned to
|
||||
// We don't label the backend offline in this case. But the error is still returned to
|
||||
// callers so failover can occur if needed.
|
||||
case ErrBackendUnexpectedJSONRPC:
|
||||
log.Debug(
|
||||
"Reecived unexpected JSON-RPC response",
|
||||
"Received unexpected JSON-RPC response",
|
||||
"name", b.Name,
|
||||
"req_id", GetReqID(ctx),
|
||||
"err", err,
|
||||
@ -396,6 +431,9 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
|
||||
}
|
||||
|
||||
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
|
||||
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
|
||||
b.networkRequestsSlidingWindow.Incr()
|
||||
|
||||
isSingleElementBatch := len(rpcReqs) == 1
|
||||
|
||||
// Single element batches are unwrapped before being sent
|
||||
@ -410,6 +448,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, wrapErr(err, "error creating backend request")
|
||||
}
|
||||
|
||||
@ -427,8 +466,10 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
|
||||
httpReq.Header.Set("content-type", "application/json")
|
||||
httpReq.Header.Set("X-Forwarded-For", xForwardedFor)
|
||||
|
||||
start := time.Now()
|
||||
httpRes, err := b.client.DoLimited(httpReq)
|
||||
if err != nil {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, wrapErr(err, "error in backend request")
|
||||
}
|
||||
|
||||
@ -446,12 +487,14 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
|
||||
|
||||
// Alchemy returns a 400 on bad JSONs, so handle that case
|
||||
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
|
||||
}
|
||||
|
||||
defer httpRes.Body.Close()
|
||||
resB, err := io.ReadAll(io.LimitReader(httpRes.Body, b.maxResponseSize))
|
||||
if err != nil {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, wrapErr(err, "error reading response body")
|
||||
}
|
||||
|
||||
@ -468,13 +511,16 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
|
||||
if err := json.Unmarshal(resB, &res); err != nil {
|
||||
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
|
||||
if responseIsNotBatched(resB) {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, ErrBackendUnexpectedJSONRPC
|
||||
}
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, ErrBackendBadResponse
|
||||
}
|
||||
}
|
||||
|
||||
if len(rpcReqs) != len(res) {
|
||||
b.networkErrorsSlidingWindow.Incr()
|
||||
return nil, ErrBackendUnexpectedJSONRPC
|
||||
}
|
||||
|
||||
@ -485,11 +531,32 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
|
||||
res.Error.HTTPErrorCode = httpRes.StatusCode
|
||||
}
|
||||
}
|
||||
duration := time.Since(start)
|
||||
b.latencySlidingWindow.Add(float64(duration))
|
||||
|
||||
sortBatchRPCResponse(rpcReqs, res)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
|
||||
func (b *Backend) IsHealthy() bool {
|
||||
errorRate := b.networkErrorsSlidingWindow.Sum() / b.networkRequestsSlidingWindow.Sum()
|
||||
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
|
||||
if errorRate >= b.maxErrorRateThreshold {
|
||||
return false
|
||||
}
|
||||
if avgLatency >= b.maxLatencyThreshold {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// IsDegraded checks if the backend is serving traffic in a degraded state (i.e. used as a last resource)
|
||||
func (b *Backend) IsDegraded() bool {
|
||||
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
|
||||
return avgLatency >= b.maxDegradedLatencyThreshold
|
||||
}
|
||||
|
||||
func responseIsNotBatched(b []byte) bool {
|
||||
var r RPCRes
|
||||
return json.Unmarshal(b, &r) == nil
|
||||
|
@ -3,6 +3,7 @@ package proxyd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -29,6 +30,11 @@ type ConsensusPoller struct {
|
||||
|
||||
tracker ConsensusTracker
|
||||
asyncHandler ConsensusAsyncHandler
|
||||
|
||||
minPeerCount uint64
|
||||
|
||||
banPeriod time.Duration
|
||||
maxUpdateThreshold time.Duration
|
||||
}
|
||||
|
||||
type backendState struct {
|
||||
@ -36,6 +42,7 @@ type backendState struct {
|
||||
|
||||
latestBlockNumber hexutil.Uint64
|
||||
latestBlockHash string
|
||||
peerCount uint64
|
||||
|
||||
lastUpdate time.Time
|
||||
|
||||
@ -47,7 +54,7 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
|
||||
defer cp.consensusGroupMux.Unlock()
|
||||
cp.consensusGroupMux.Lock()
|
||||
|
||||
g := make([]*Backend, len(cp.backendGroup.Backends))
|
||||
g := make([]*Backend, len(cp.consensusGroup))
|
||||
copy(g, cp.consensusGroup)
|
||||
|
||||
return g
|
||||
@ -141,6 +148,24 @@ func WithAsyncHandler(asyncHandler ConsensusAsyncHandler) ConsensusOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithBanPeriod(banPeriod time.Duration) ConsensusOpt {
|
||||
return func(cp *ConsensusPoller) {
|
||||
cp.banPeriod = banPeriod
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxUpdateThreshold(maxUpdateThreshold time.Duration) ConsensusOpt {
|
||||
return func(cp *ConsensusPoller) {
|
||||
cp.maxUpdateThreshold = maxUpdateThreshold
|
||||
}
|
||||
}
|
||||
|
||||
func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
|
||||
return func(cp *ConsensusPoller) {
|
||||
cp.minPeerCount = minPeerCount
|
||||
}
|
||||
}
|
||||
|
||||
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
||||
@ -153,6 +178,10 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
|
||||
cancelFunc: cancelFunc,
|
||||
backendGroup: bg,
|
||||
backendState: state,
|
||||
|
||||
banPeriod: 5 * time.Minute,
|
||||
maxUpdateThreshold: 30 * time.Second,
|
||||
minPeerCount: 3,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -180,14 +209,29 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
||||
return
|
||||
}
|
||||
|
||||
if be.IsRateLimited() || !be.Online() {
|
||||
// if backend it not online or not in a health state we'll only resume checkin it after ban
|
||||
if !be.Online() || !be.IsHealthy() {
|
||||
log.Warn("backend banned - not online or not healthy", "backend", be.Name, "bannedUntil", bs.bannedUntil)
|
||||
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
||||
}
|
||||
|
||||
// if backend it not in sync we'll check again after ban
|
||||
inSync, err := cp.isInSync(ctx, be)
|
||||
if err != nil || !inSync {
|
||||
log.Warn("backend banned - not in sync", "backend", be.Name, "bannedUntil", bs.bannedUntil)
|
||||
bs.bannedUntil = time.Now().Add(cp.banPeriod)
|
||||
}
|
||||
|
||||
// if backend exhausted rate limit we'll skip it for now
|
||||
if be.IsRateLimited() {
|
||||
return
|
||||
}
|
||||
|
||||
// we'll introduce here checks to ban the backend
|
||||
// i.e. node is syncing the chain
|
||||
|
||||
// then update backend consensus
|
||||
peerCount, err := cp.getPeerCount(ctx, be)
|
||||
if err != nil {
|
||||
log.Warn("error updating backend", "name", be.Name, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
|
||||
if err != nil {
|
||||
@ -195,7 +239,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
|
||||
return
|
||||
}
|
||||
|
||||
changed := cp.setBackendState(be, latestBlockNumber, latestBlockHash)
|
||||
changed := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash)
|
||||
|
||||
if changed {
|
||||
RecordBackendLatestBlock(be, latestBlockNumber)
|
||||
@ -211,7 +255,15 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
||||
currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
|
||||
|
||||
for _, be := range cp.backendGroup.Backends {
|
||||
backendLatestBlockNumber, backendLatestBlockHash := cp.getBackendState(be)
|
||||
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate := cp.getBackendState(be)
|
||||
|
||||
if peerCount < cp.minPeerCount {
|
||||
continue
|
||||
}
|
||||
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
|
||||
continue
|
||||
}
|
||||
|
||||
if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
|
||||
lowestBlock = backendLatestBlockNumber
|
||||
lowestBlockHash = backendLatestBlockHash
|
||||
@ -242,7 +294,20 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
||||
consensusBackends = consensusBackends[:0]
|
||||
filteredBackendsNames = filteredBackendsNames[:0]
|
||||
for _, be := range cp.backendGroup.Backends {
|
||||
if be.IsRateLimited() || !be.Online() || time.Now().Before(cp.backendState[be].bannedUntil) {
|
||||
/*
|
||||
a serving node needs to be:
|
||||
- healthy (network)
|
||||
- not rate limited
|
||||
- online
|
||||
- not banned
|
||||
- with minimum peer count
|
||||
- updated recently
|
||||
*/
|
||||
bs := cp.backendState[be]
|
||||
notUpdated := bs.lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
|
||||
isBanned := time.Now().Before(bs.bannedUntil)
|
||||
notEnoughPeers := bs.peerCount < cp.minPeerCount
|
||||
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers {
|
||||
filteredBackendsNames = append(filteredBackendsNames, be.Name)
|
||||
continue
|
||||
}
|
||||
@ -291,6 +356,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
|
||||
log.Info("group state", "proposedBlock", proposedBlock, "consensusBackends", strings.Join(consensusBackendsNames, ", "), "filteredBackends", strings.Join(filteredBackendsNames, ", "))
|
||||
}
|
||||
|
||||
// Unban remove any bans from the backends
|
||||
func (cp *ConsensusPoller) Unban() {
|
||||
for _, be := range cp.backendGroup.Backends {
|
||||
bs := cp.backendState[be]
|
||||
bs.backendStateMux.Lock()
|
||||
bs.bannedUntil = time.Now().Add(-10 * time.Hour)
|
||||
bs.backendStateMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// fetchBlock Convenient wrapper to make a request to get a block directly from the backend
|
||||
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
|
||||
var rpcRes RPCRes
|
||||
@ -301,7 +376,7 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
|
||||
|
||||
jsonMap, ok := rpcRes.Result.(map[string]interface{})
|
||||
if !ok {
|
||||
return 0, "", fmt.Errorf("unexpected response type checking consensus on backend %s", be.Name)
|
||||
return 0, "", fmt.Errorf("unexpected response to eth_getBlockByNumber on backend %s", be.Name)
|
||||
}
|
||||
blockNumber = hexutil.Uint64(hexutil.MustDecodeUint64(jsonMap["number"].(string)))
|
||||
blockHash = jsonMap["hash"].(string)
|
||||
@ -309,19 +384,67 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
|
||||
return
|
||||
}
|
||||
|
||||
func (cp *ConsensusPoller) getBackendState(be *Backend) (blockNumber hexutil.Uint64, blockHash string) {
|
||||
// isSyncing Convenient wrapper to check if the backend is syncing from the network
|
||||
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
|
||||
var rpcRes RPCRes
|
||||
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
jsonMap, ok := rpcRes.Result.(string)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name)
|
||||
}
|
||||
|
||||
count = hexutil.MustDecodeUint64(jsonMap)
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// isInSync is a convenient wrapper to check if the backend is in sync from the network
|
||||
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
|
||||
var rpcRes RPCRes
|
||||
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var res bool
|
||||
switch typed := rpcRes.Result.(type) {
|
||||
case bool:
|
||||
syncing := typed
|
||||
res = !syncing
|
||||
case string:
|
||||
syncing, err := strconv.ParseBool(typed)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
res = !syncing
|
||||
default:
|
||||
// result is a json when not in sync
|
||||
res = false
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time) {
|
||||
bs := cp.backendState[be]
|
||||
bs.backendStateMux.Lock()
|
||||
peerCount = bs.peerCount
|
||||
blockNumber = bs.latestBlockNumber
|
||||
blockHash = bs.latestBlockHash
|
||||
lastUpdate = bs.lastUpdate
|
||||
bs.backendStateMux.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (cp *ConsensusPoller) setBackendState(be *Backend, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
|
||||
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool) {
|
||||
bs := cp.backendState[be]
|
||||
bs.backendStateMux.Lock()
|
||||
changed = bs.latestBlockHash != blockHash
|
||||
bs.peerCount = peerCount
|
||||
bs.latestBlockNumber = blockNumber
|
||||
bs.latestBlockHash = blockHash
|
||||
bs.lastUpdate = time.Now()
|
||||
|
@ -2,12 +2,14 @@ package integration_tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
|
||||
"github.com/ethereum-optimism/optimism/proxyd"
|
||||
ms "github.com/ethereum-optimism/optimism/proxyd/tools/mockserver/handler"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -54,6 +56,7 @@ func TestConsensus(t *testing.T) {
|
||||
t.Run("initial consensus", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
// unknown consensus at init
|
||||
require.Equal(t, "0x0", bg.Consensus.GetConsensusBlockNumber().String())
|
||||
@ -68,9 +71,64 @@ func TestConsensus(t *testing.T) {
|
||||
require.Equal(t, "0x1", bg.Consensus.GetConsensusBlockNumber().String())
|
||||
})
|
||||
|
||||
t.Run("prevent using a backend with low peer count", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
// advance latest on node2 to 0x2
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "net_peerCount",
|
||||
Block: "",
|
||||
Response: buildPeerCountResponse(1),
|
||||
})
|
||||
|
||||
be := backend(bg, "node1")
|
||||
require.NotNil(t, be)
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
}
|
||||
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||
|
||||
require.NotContains(t, consensusGroup, be)
|
||||
require.Equal(t, 1, len(consensusGroup))
|
||||
})
|
||||
|
||||
t.Run("prevent using a backend not in sync", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
// advance latest on node2 to 0x2
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_syncing",
|
||||
Block: "",
|
||||
Response: buildResponse(map[string]string{
|
||||
"startingblock": "0x0",
|
||||
"currentblock": "0x0",
|
||||
"highestblock": "0x100",
|
||||
}),
|
||||
})
|
||||
|
||||
be := backend(bg, "node1")
|
||||
require.NotNil(t, be)
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
}
|
||||
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||
consensusGroup := bg.Consensus.GetConsensusGroup()
|
||||
|
||||
require.NotContains(t, consensusGroup, be)
|
||||
require.Equal(t, 1, len(consensusGroup))
|
||||
})
|
||||
|
||||
t.Run("advance consensus", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
@ -84,14 +142,13 @@ func TestConsensus(t *testing.T) {
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
}
|
||||
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||
|
||||
// consensus should stick to 0x1, since node1 is still lagging there
|
||||
bg.Consensus.UpdateBackendGroupConsensus(ctx)
|
||||
@ -101,7 +158,7 @@ func TestConsensus(t *testing.T) {
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -117,6 +174,7 @@ func TestConsensus(t *testing.T) {
|
||||
t.Run("broken consensus", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
@ -130,12 +188,12 @@ func TestConsensus(t *testing.T) {
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -151,7 +209,7 @@ func TestConsensus(t *testing.T) {
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x2",
|
||||
Response: buildResponse("0x2", "wrong_hash"),
|
||||
Response: buildGetBlockResponse("0x2", "wrong_hash"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -169,6 +227,7 @@ func TestConsensus(t *testing.T) {
|
||||
t.Run("broken consensus with depth 2", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
@ -182,12 +241,12 @@ func TestConsensus(t *testing.T) {
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x2", "hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "hash2"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -203,12 +262,12 @@ func TestConsensus(t *testing.T) {
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x3", "hash3"),
|
||||
Response: buildGetBlockResponse("0x3", "hash3"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x3", "hash3"),
|
||||
Response: buildGetBlockResponse("0x3", "hash3"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -224,12 +283,12 @@ func TestConsensus(t *testing.T) {
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x2",
|
||||
Response: buildResponse("0x2", "wrong_hash2"),
|
||||
Response: buildGetBlockResponse("0x2", "wrong_hash2"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x3",
|
||||
Response: buildResponse("0x3", "wrong_hash3"),
|
||||
Response: buildGetBlockResponse("0x3", "wrong_hash3"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -245,6 +304,7 @@ func TestConsensus(t *testing.T) {
|
||||
t.Run("fork in advanced block", func(t *testing.T) {
|
||||
h1.ResetOverrides()
|
||||
h2.ResetOverrides()
|
||||
bg.Consensus.Unban()
|
||||
|
||||
for _, be := range bg.Backends {
|
||||
bg.Consensus.UpdateBackend(ctx, be)
|
||||
@ -258,32 +318,32 @@ func TestConsensus(t *testing.T) {
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x2",
|
||||
Response: buildResponse("0x2", "node1_0x2"),
|
||||
Response: buildGetBlockResponse("0x2", "node1_0x2"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x2",
|
||||
Response: buildResponse("0x2", "node2_0x2"),
|
||||
Response: buildGetBlockResponse("0x2", "node2_0x2"),
|
||||
})
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x3",
|
||||
Response: buildResponse("0x3", "node1_0x3"),
|
||||
Response: buildGetBlockResponse("0x3", "node1_0x3"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "0x3",
|
||||
Response: buildResponse("0x3", "node2_0x3"),
|
||||
Response: buildGetBlockResponse("0x3", "node2_0x3"),
|
||||
})
|
||||
h1.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x3", "node1_0x3"),
|
||||
Response: buildGetBlockResponse("0x3", "node1_0x3"),
|
||||
})
|
||||
h2.AddOverride(&ms.MethodTemplate{
|
||||
Method: "eth_getBlockByNumber",
|
||||
Block: "latest",
|
||||
Response: buildResponse("0x3", "node2_0x3"),
|
||||
Response: buildGetBlockResponse("0x3", "node2_0x3"),
|
||||
})
|
||||
|
||||
// poll for group consensus
|
||||
@ -297,13 +357,31 @@ func TestConsensus(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func buildResponse(number string, hash string) string {
|
||||
return fmt.Sprintf(`{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 67,
|
||||
"result": {
|
||||
"number": "%s",
|
||||
"hash": "%s"
|
||||
}
|
||||
}`, number, hash)
|
||||
func backend(bg *proxyd.BackendGroup, name string) *proxyd.Backend {
|
||||
for _, be := range bg.Backends {
|
||||
if be.Name == name {
|
||||
return be
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildPeerCountResponse(count uint64) string {
|
||||
return buildResponse(hexutil.Uint64(count).String())
|
||||
}
|
||||
func buildGetBlockResponse(number string, hash string) string {
|
||||
return buildResponse(map[string]string{
|
||||
"number": number,
|
||||
"hash": hash,
|
||||
})
|
||||
}
|
||||
|
||||
func buildResponse(result interface{}) string {
|
||||
res, err := json.Marshal(proxyd.RPCRes{
|
||||
Result: result,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return string(res)
|
||||
}
|
||||
|
@ -1,3 +1,17 @@
|
||||
- method: net_peerCount
|
||||
response: >
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 67,
|
||||
"result": "0x10"
|
||||
}
|
||||
- method: eth_syncing
|
||||
response: >
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 67,
|
||||
"result": false
|
||||
}
|
||||
- method: eth_getBlockByNumber
|
||||
block: latest
|
||||
response: >
|
||||
|
@ -97,12 +97,17 @@ func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
|
||||
return windowStart.Before(t) && !t.After(now)
|
||||
}
|
||||
|
||||
// Add inserts a new data point into the window, with value `val` with the current time
|
||||
// Add inserts a new data point into the window, with value `val` and the current time
|
||||
func (sw *AvgSlidingWindow) Add(val float64) {
|
||||
t := sw.clock.Now()
|
||||
sw.AddWithTime(t, val)
|
||||
}
|
||||
|
||||
// Incr is an alias to insert a data point with value float64(1) and the current time
|
||||
func (sw *AvgSlidingWindow) Incr() {
|
||||
sw.Add(1)
|
||||
}
|
||||
|
||||
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
|
||||
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
|
||||
sw.advance()
|
||||
|
Loading…
Reference in New Issue
Block a user