go-ethereum/beacon/light/request/server.go
Felföldi Zsolt aadcb88675
cmd/blsync, beacon/light: beacon chain light client (#28822)
Here we add a beacon chain light client for use by geth.

Geth can now be configured to run against a beacon chain API endpoint,
without pointing a CL to it. To set this up, use the `--beacon.api` flag. Information
provided by the beacon chain is verified, i.e. geth does not blindly trust the beacon
API endpoint in this mode. The root of trust are the beacon chain 'sync committees'.

The configured beacon API endpoint must provide light client data. At this time, only
Lodestar and Nimbus provide the necessary APIs.

There is also a standalone tool, cmd/blsync, which uses the beacon chain light client
to drive any EL implementation via its engine API.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
2024-03-06 17:50:22 +01:00

440 lines
14 KiB
Go

// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package request
import (
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
)
var (
// request events
EvResponse = &EventType{Name: "response", requestEvent: true} // data: RequestResponse; sent by requestServer
EvFail = &EventType{Name: "fail", requestEvent: true} // data: RequestResponse; sent by requestServer
EvTimeout = &EventType{Name: "timeout", requestEvent: true} // data: RequestResponse; sent by serverWithTimeout
// server events
EvRegistered = &EventType{Name: "registered"} // data: nil; sent by Scheduler
EvUnregistered = &EventType{Name: "unregistered"} // data: nil; sent by Scheduler
EvCanRequestAgain = &EventType{Name: "canRequestAgain"} // data: nil; sent by serverWithLimits
)
const (
softRequestTimeout = time.Second // allow resending request to a different server but do not cancel yet
hardRequestTimeout = time.Second * 10 // cancel request
)
const (
// serverWithLimits parameters
parallelAdjustUp = 0.1 // adjust parallelLimit up in case of success under full load
parallelAdjustDown = 1 // adjust parallelLimit down in case of timeout/failure
minParallelLimit = 1 // parallelLimit lower bound
defaultParallelLimit = 3 // parallelLimit initial value
minFailureDelay = time.Millisecond * 100 // minimum disable time in case of request failure
maxFailureDelay = time.Minute // maximum disable time in case of request failure
maxServerEventBuffer = 5 // server event allowance buffer limit
maxServerEventRate = time.Second // server event allowance buffer recharge rate
)
// requestServer can send requests in a non-blocking way and feed back events
// through the event callback. After each request it should send back either
// EvResponse or EvFail. Additionally, it may also send application-defined
// events that the Modules can interpret.
type requestServer interface {
Subscribe(eventCallback func(Event))
SendRequest(ID, Request)
Unsubscribe()
}
// server is implemented by a requestServer wrapped into serverWithTimeout and
// serverWithLimits and is used by Scheduler.
// In addition to requestServer functionality, server can also handle timeouts,
// limit the number of parallel in-flight requests and temporarily disable
// new requests based on timeouts and response failures.
type server interface {
subscribe(eventCallback func(Event))
canRequestNow() bool
sendRequest(Request) ID
fail(string)
unsubscribe()
}
// NewServer wraps a requestServer and returns a server
func NewServer(rs requestServer, clock mclock.Clock) server {
s := &serverWithLimits{}
s.parent = rs
s.serverWithTimeout.init(clock)
s.init()
return s
}
// EventType identifies an event type, either related to a request or the server
// in general. Server events can also be externally defined.
type EventType struct {
Name string
requestEvent bool // all request events are pre-defined in request package
}
// Event describes an event where the type of Data depends on Type.
// Server field is not required when sent through the event callback; it is filled
// out when processed by the Scheduler. Note that the Scheduler can also create
// and send events (EvRegistered, EvUnregistered) directly.
type Event struct {
Type *EventType
Server Server // filled by Scheduler
Data any
}
// IsRequestEvent returns true if the event is a request event
func (e *Event) IsRequestEvent() bool {
return e.Type.requestEvent
}
// RequestInfo assumes that the event is a request event and returns its contents
// in a convenient form.
func (e *Event) RequestInfo() (ServerAndID, Request, Response) {
data := e.Data.(RequestResponse)
return ServerAndID{Server: e.Server, ID: data.ID}, data.Request, data.Response
}
// RequestResponse is the Data type of request events.
type RequestResponse struct {
ID ID
Request Request
Response Response
}
// serverWithTimeout wraps a requestServer and introduces timeouts.
// The request's lifecycle is concluded if EvResponse or EvFail emitted by the
// parent requestServer. If this does not happen until softRequestTimeout then
// EvTimeout is emitted, after which the final EvResponse or EvFail is still
// guaranteed to follow.
// If the parent fails to send this final event for hardRequestTimeout then
// serverWithTimeout emits EvFail and discards any further events from the
// parent related to the given request.
type serverWithTimeout struct {
parent requestServer
lock sync.Mutex
clock mclock.Clock
childEventCb func(event Event)
timeouts map[ID]mclock.Timer
lastID ID
}
// init initializes serverWithTimeout
func (s *serverWithTimeout) init(clock mclock.Clock) {
s.clock = clock
s.timeouts = make(map[ID]mclock.Timer)
}
// subscribe subscribes to events which include parent (requestServer) events
// plus EvTimeout.
func (s *serverWithTimeout) subscribe(eventCallback func(event Event)) {
s.lock.Lock()
defer s.lock.Unlock()
s.childEventCb = eventCallback
s.parent.Subscribe(s.eventCallback)
}
// sendRequest generated a new request ID, emits EvRequest, sets up the timeout
// timer, then sends the request through the parent (requestServer).
func (s *serverWithTimeout) sendRequest(request Request) (reqId ID) {
s.lock.Lock()
s.lastID++
id := s.lastID
s.startTimeout(RequestResponse{ID: id, Request: request})
s.lock.Unlock()
s.parent.SendRequest(id, request)
return id
}
// eventCallback is called by parent (requestServer) event subscription.
func (s *serverWithTimeout) eventCallback(event Event) {
s.lock.Lock()
defer s.lock.Unlock()
switch event.Type {
case EvResponse, EvFail:
id := event.Data.(RequestResponse).ID
if timer, ok := s.timeouts[id]; ok {
// Note: if stopping the timer is unsuccessful then the resulting AfterFunc
// call will just do nothing
timer.Stop()
delete(s.timeouts, id)
s.childEventCb(event)
}
default:
s.childEventCb(event)
}
}
// startTimeout starts a timeout timer for the given request.
func (s *serverWithTimeout) startTimeout(reqData RequestResponse) {
id := reqData.ID
s.timeouts[id] = s.clock.AfterFunc(softRequestTimeout, func() {
s.lock.Lock()
if _, ok := s.timeouts[id]; !ok {
s.lock.Unlock()
return
}
s.timeouts[id] = s.clock.AfterFunc(hardRequestTimeout-softRequestTimeout, func() {
s.lock.Lock()
if _, ok := s.timeouts[id]; !ok {
s.lock.Unlock()
return
}
delete(s.timeouts, id)
childEventCb := s.childEventCb
s.lock.Unlock()
childEventCb(Event{Type: EvFail, Data: reqData})
})
childEventCb := s.childEventCb
s.lock.Unlock()
childEventCb(Event{Type: EvTimeout, Data: reqData})
})
}
// stop stops all goroutines associated with the server.
func (s *serverWithTimeout) unsubscribe() {
s.lock.Lock()
defer s.lock.Unlock()
for _, timer := range s.timeouts {
if timer != nil {
timer.Stop()
}
}
s.childEventCb = nil
s.parent.Unsubscribe()
}
// serverWithLimits wraps serverWithTimeout and implements server. It limits the
// number of parallel in-flight requests and prevents sending new requests when a
// pending one has already timed out. Server events are also rate limited.
// It also implements a failure delay mechanism that adds an exponentially growing
// delay each time a request fails (wrong answer or hard timeout). This makes the
// syncing mechanism less brittle as temporary failures of the server might happen
// sometimes, but still avoids hammering a non-functional server with requests.
type serverWithLimits struct {
serverWithTimeout
lock sync.Mutex
childEventCb func(event Event)
softTimeouts map[ID]struct{}
pendingCount, timeoutCount int
parallelLimit float32
sendEvent bool
delayTimer mclock.Timer
delayCounter int
failureDelayEnd mclock.AbsTime
failureDelay float64
serverEventBuffer int
eventBufferUpdated mclock.AbsTime
}
// init initializes serverWithLimits
func (s *serverWithLimits) init() {
s.softTimeouts = make(map[ID]struct{})
s.parallelLimit = defaultParallelLimit
s.serverEventBuffer = maxServerEventBuffer
}
// subscribe subscribes to events which include parent (serverWithTimeout) events
// plus EvCanRequstAgain.
func (s *serverWithLimits) subscribe(eventCallback func(event Event)) {
s.lock.Lock()
defer s.lock.Unlock()
s.childEventCb = eventCallback
s.serverWithTimeout.subscribe(s.eventCallback)
}
// eventCallback is called by parent (serverWithTimeout) event subscription.
func (s *serverWithLimits) eventCallback(event Event) {
s.lock.Lock()
var sendCanRequestAgain bool
passEvent := true
switch event.Type {
case EvTimeout:
id := event.Data.(RequestResponse).ID
s.softTimeouts[id] = struct{}{}
s.timeoutCount++
s.parallelLimit -= parallelAdjustDown
if s.parallelLimit < minParallelLimit {
s.parallelLimit = minParallelLimit
}
log.Debug("Server timeout", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
case EvResponse, EvFail:
id := event.Data.(RequestResponse).ID
if _, ok := s.softTimeouts[id]; ok {
delete(s.softTimeouts, id)
s.timeoutCount--
log.Debug("Server timeout finalized", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
}
if event.Type == EvResponse && s.pendingCount >= int(s.parallelLimit) {
s.parallelLimit += parallelAdjustUp
}
s.pendingCount--
if s.canRequest() {
sendCanRequestAgain = s.sendEvent
s.sendEvent = false
}
if event.Type == EvFail {
s.failLocked("failed request")
}
default:
// server event; check rate limit
if s.serverEventBuffer < maxServerEventBuffer {
now := s.clock.Now()
sinceUpdate := time.Duration(now - s.eventBufferUpdated)
if sinceUpdate >= maxServerEventRate*time.Duration(maxServerEventBuffer-s.serverEventBuffer) {
s.serverEventBuffer = maxServerEventBuffer
s.eventBufferUpdated = now
} else {
addBuffer := int(sinceUpdate / maxServerEventRate)
s.serverEventBuffer += addBuffer
s.eventBufferUpdated += mclock.AbsTime(maxServerEventRate * time.Duration(addBuffer))
}
}
if s.serverEventBuffer > 0 {
s.serverEventBuffer--
} else {
passEvent = false
}
}
childEventCb := s.childEventCb
s.lock.Unlock()
if passEvent {
childEventCb(event)
}
if sendCanRequestAgain {
childEventCb(Event{Type: EvCanRequestAgain})
}
}
// sendRequest sends a request through the parent (serverWithTimeout).
func (s *serverWithLimits) sendRequest(request Request) (reqId ID) {
s.lock.Lock()
s.pendingCount++
s.lock.Unlock()
return s.serverWithTimeout.sendRequest(request)
}
// stop stops all goroutines associated with the server.
func (s *serverWithLimits) unsubscribe() {
s.lock.Lock()
defer s.lock.Unlock()
if s.delayTimer != nil {
s.delayTimer.Stop()
s.delayTimer = nil
}
s.childEventCb = nil
s.serverWithTimeout.unsubscribe()
}
// canRequest checks whether a new request can be started.
func (s *serverWithLimits) canRequest() bool {
if s.delayTimer != nil || s.pendingCount >= int(s.parallelLimit) || s.timeoutCount > 0 {
return false
}
if s.parallelLimit < minParallelLimit {
s.parallelLimit = minParallelLimit
}
return true
}
// canRequestNow checks whether a new request can be started, according to the
// current in-flight request count and parallelLimit, and also the failure delay
// timer.
// If it returns false then it is guaranteed that an EvCanRequestAgain will be
// sent whenever the server becomes available for requesting again.
func (s *serverWithLimits) canRequestNow() bool {
var sendCanRequestAgain bool
s.lock.Lock()
canRequest := s.canRequest()
if canRequest {
sendCanRequestAgain = s.sendEvent
s.sendEvent = false
}
childEventCb := s.childEventCb
s.lock.Unlock()
if sendCanRequestAgain {
childEventCb(Event{Type: EvCanRequestAgain})
}
return canRequest
}
// delay sets the delay timer to the given duration, disabling new requests for
// the given period.
func (s *serverWithLimits) delay(delay time.Duration) {
if s.delayTimer != nil {
// Note: if stopping the timer is unsuccessful then the resulting AfterFunc
// call will just do nothing
s.delayTimer.Stop()
s.delayTimer = nil
}
s.delayCounter++
delayCounter := s.delayCounter
log.Debug("Server delay started", "length", delay)
s.delayTimer = s.clock.AfterFunc(delay, func() {
log.Debug("Server delay ended", "length", delay)
var sendCanRequestAgain bool
s.lock.Lock()
if s.delayTimer != nil && s.delayCounter == delayCounter { // do nothing if there is a new timer now
s.delayTimer = nil
if s.canRequest() {
sendCanRequestAgain = s.sendEvent
s.sendEvent = false
}
}
childEventCb := s.childEventCb
s.lock.Unlock()
if sendCanRequestAgain {
childEventCb(Event{Type: EvCanRequestAgain})
}
})
}
// fail reports that a response from the server was found invalid by the processing
// Module, disabling new requests for a dynamically adjused time period.
func (s *serverWithLimits) fail(desc string) {
s.lock.Lock()
defer s.lock.Unlock()
s.failLocked(desc)
}
// failLocked calculates the dynamic failure delay and applies it.
func (s *serverWithLimits) failLocked(desc string) {
log.Debug("Server error", "description", desc)
s.failureDelay *= 2
now := s.clock.Now()
if now > s.failureDelayEnd {
s.failureDelay *= math.Pow(2, -float64(now-s.failureDelayEnd)/float64(maxFailureDelay))
}
if s.failureDelay < float64(minFailureDelay) {
s.failureDelay = float64(minFailureDelay)
}
s.failureDelayEnd = now + mclock.AbsTime(s.failureDelay)
s.delay(time.Duration(s.failureDelay))
}