diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index 1bba220d31..8689877d8b 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -17,11 +17,13 @@ package api import ( + "context" "encoding/json" "errors" "fmt" "io" "net/http" + "sync" "time" "github.com/donovanhide/eventsource" @@ -416,39 +418,34 @@ type HeadEventListener struct { // The callbacks are also called for the current head and optimistic head at startup. // They are never called concurrently. func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() { - closeCh := make(chan struct{}) // initiate closing the stream - closedCh := make(chan struct{}) // stream closed (or failed to create) - stoppedCh := make(chan struct{}) // sync loop stopped - streamCh := make(chan *eventsource.Stream, 1) + var ( + ctx, closeCtx = context.WithCancel(context.Background()) + streamCh = make(chan *eventsource.Stream, 1) + wg sync.WaitGroup + ) + + // When connected to a Lodestar node the subscription blocks until the first actual + // event arrives; therefore we create the subscription in a separate goroutine while + // letting the main goroutine sync up to the current head. + wg.Add(1) go func() { - defer close(closedCh) - // when connected to a Lodestar node the subscription blocks until the - // first actual event arrives; therefore we create the subscription in - // a separate goroutine while letting the main goroutine sync up to the - // current head - req, err := http.NewRequest("GET", api.url+ - "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update", nil) - if err != nil { - listener.OnError(fmt.Errorf("error creating event subscription request: %v", err)) - return - } - for k, v := range api.customHeaders { - req.Header.Set(k, v) - } - stream, err := eventsource.SubscribeWithRequest("", req) - if err != nil { - listener.OnError(fmt.Errorf("error creating event subscription: %v", err)) - close(streamCh) + defer wg.Done() + stream := api.startEventStream(ctx, &listener) + if stream == nil { + // This case happens when the context was closed. return } + // Stream was opened, wait for close signal. streamCh <- stream - <-closeCh + <-ctx.Done() stream.Close() }() + wg.Add(1) go func() { - defer close(stoppedCh) + defer wg.Done() + // Request initial data. if head, err := api.GetHeader(common.Hash{}); err == nil { listener.OnNewHead(head.Slot, head.Hash()) } @@ -458,32 +455,42 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() if finalityUpdate, err := api.GetFinalityUpdate(); err == nil { listener.OnFinality(finalityUpdate) } - stream := <-streamCh - if stream == nil { + + // Receive the stream. + var stream *eventsource.Stream + select { + case stream = <-streamCh: + case <-ctx.Done(): return } for { select { + case <-ctx.Done(): + stream.Close() + case event, ok := <-stream.Events: if !ok { return } switch event.Event() { case "head": - if slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())); err == nil { + slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())) + if err == nil { listener.OnNewHead(slot, blockRoot) } else { listener.OnError(fmt.Errorf("error decoding head event: %v", err)) } case "light_client_optimistic_update": - if signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())); err == nil { + signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())) + if err == nil { listener.OnSignedHead(signedHead) } else { listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err)) } case "light_client_finality_update": - if finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())); err == nil { + finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())) + if err == nil { listener.OnFinality(finalityUpdate) } else { listener.OnError(fmt.Errorf("error decoding finality update event: %v", err)) @@ -491,6 +498,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() default: listener.OnError(fmt.Errorf("unexpected event: %s", event.Event())) } + case err, ok := <-stream.Errors: if !ok { return @@ -499,9 +507,43 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() } } }() + return func() { - close(closeCh) - <-closedCh - <-stoppedCh + closeCtx() + wg.Wait() + } +} + +// startEventStream establishes an event stream. This will keep retrying until the stream has been +// established. It can only return nil when the context is canceled. +func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream { + for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) { + path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update" + req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil) + if err != nil { + listener.OnError(fmt.Errorf("error creating event subscription request: %v", err)) + continue + } + for k, v := range api.customHeaders { + req.Header.Set(k, v) + } + stream, err := eventsource.SubscribeWithRequest("", req) + if err != nil { + listener.OnError(fmt.Errorf("error creating event subscription: %v", err)) + continue + } + return stream + } + return nil +} + +func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case <-timer.C: + return true + case <-ctx.Done(): + return false } }