// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package event import ( "context" "sync" "time" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/mclock" ) // Subscription represents a stream of events. The carrier of the events is typically a // channel, but isn't part of the interface. // // Subscriptions can fail while established. Failures are reported through an error // channel. It receives a value if there is an issue with the subscription (e.g. the // network connection delivering the events has been closed). Only one value will ever be // sent. // // The error channel is closed when the subscription ends successfully (i.e. when the // source of events is closed). It is also closed when Unsubscribe is called. // // The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all // cases to ensure that resources related to the subscription are released. It can be // called any number of times. type Subscription interface { Err() <-chan error // returns the error channel Unsubscribe() // cancels sending of events, closing the error channel } // NewSubscription runs a producer function as a subscription in a new goroutine. The // channel given to the producer is closed when Unsubscribe is called. If fn returns an // error, it is sent on the subscription's error channel. func NewSubscription(producer func(<-chan struct{}) error) Subscription { s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} gopool.Submit(func() { defer close(s.err) err := producer(s.unsub) s.mu.Lock() defer s.mu.Unlock() if !s.unsubscribed { if err != nil { s.err <- err } s.unsubscribed = true } }) return s } type funcSub struct { unsub chan struct{} err chan error mu sync.Mutex unsubscribed bool } func (s *funcSub) Unsubscribe() { s.mu.Lock() if s.unsubscribed { s.mu.Unlock() return } s.unsubscribed = true close(s.unsub) s.mu.Unlock() // Wait for producer shutdown. <-s.err } func (s *funcSub) Err() <-chan error { return s.err } // Resubscribe calls fn repeatedly to keep a subscription established. When the // subscription is established, Resubscribe waits for it to fail and calls fn again. This // process repeats until Unsubscribe is called or the active subscription ends // successfully. // // Resubscribe applies backoff between calls to fn. The time between calls is adapted // based on the error rate, but will never exceed backoffMax. func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription { return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) { return fn(ctx) }) } // A ResubscribeFunc attempts to establish a subscription. type ResubscribeFunc func(context.Context) (Subscription, error) // ResubscribeErr calls fn repeatedly to keep a subscription established. When the // subscription is established, ResubscribeErr waits for it to fail and calls fn again. This // process repeats until Unsubscribe is called or the active subscription ends // successfully. // // The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr, // the error of the failing subscription is available to the callback for logging // purposes. // // ResubscribeErr applies backoff between calls to fn. The time between calls is adapted // based on the error rate, but will never exceed backoffMax. func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription { s := &resubscribeSub{ waitTime: backoffMax / 10, backoffMax: backoffMax, fn: fn, err: make(chan error), unsub: make(chan struct{}, 1), } go s.loop() return s } // A ResubscribeErrFunc attempts to establish a subscription. // For every call but the first, the second argument to this function is // the error that occurred with the previous subscription. type ResubscribeErrFunc func(context.Context, error) (Subscription, error) type resubscribeSub struct { fn ResubscribeErrFunc err chan error unsub chan struct{} unsubOnce sync.Once lastTry mclock.AbsTime lastSubErr error waitTime, backoffMax time.Duration } func (s *resubscribeSub) Unsubscribe() { s.unsubOnce.Do(func() { s.unsub <- struct{}{} <-s.err }) } func (s *resubscribeSub) Err() <-chan error { return s.err } func (s *resubscribeSub) loop() { defer close(s.err) var done bool for !done { sub := s.subscribe() if sub == nil { break } done = s.waitForError(sub) sub.Unsubscribe() } } func (s *resubscribeSub) subscribe() Subscription { subscribed := make(chan error) var sub Subscription for { s.lastTry = mclock.Now() ctx, cancel := context.WithCancel(context.Background()) gopool.Submit(func() { rsub, err := s.fn(ctx, s.lastSubErr) sub = rsub subscribed <- err }) select { case err := <-subscribed: cancel() if err == nil { if sub == nil { panic("event: ResubscribeFunc returned nil subscription and no error") } return sub } // Subscribing failed, wait before launching the next try. if s.backoffWait() { return nil // unsubscribed during wait } case <-s.unsub: cancel() <-subscribed // avoid leaking the s.fn goroutine. return nil } } } func (s *resubscribeSub) waitForError(sub Subscription) bool { defer sub.Unsubscribe() select { case err := <-sub.Err(): s.lastSubErr = err return err == nil case <-s.unsub: return true } } func (s *resubscribeSub) backoffWait() bool { if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax { s.waitTime = s.backoffMax / 10 } else { s.waitTime *= 2 if s.waitTime > s.backoffMax { s.waitTime = s.backoffMax } } t := time.NewTimer(s.waitTime) defer t.Stop() select { case <-t.C: return false case <-s.unsub: return true } } // SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once. // // For code that handle more than one subscription, a scope can be used to conveniently // unsubscribe all of them with a single call. The example demonstrates a typical use in a // larger program. // // The zero value is ready to use. type SubscriptionScope struct { mu sync.Mutex subs map[*scopeSub]struct{} closed bool } type scopeSub struct { sc *SubscriptionScope s Subscription } // Track starts tracking a subscription. If the scope is closed, Track returns nil. The // returned subscription is a wrapper. Unsubscribing the wrapper removes it from the // scope. func (sc *SubscriptionScope) Track(s Subscription) Subscription { sc.mu.Lock() defer sc.mu.Unlock() if sc.closed { return nil } if sc.subs == nil { sc.subs = make(map[*scopeSub]struct{}) } ss := &scopeSub{sc, s} sc.subs[ss] = struct{}{} return ss } // Close calls Unsubscribe on all tracked subscriptions and prevents further additions to // the tracked set. Calls to Track after Close return nil. func (sc *SubscriptionScope) Close() { sc.mu.Lock() defer sc.mu.Unlock() if sc.closed { return } sc.closed = true for s := range sc.subs { s.s.Unsubscribe() } sc.subs = nil } // Count returns the number of tracked subscriptions. // It is meant to be used for debugging. func (sc *SubscriptionScope) Count() int { sc.mu.Lock() defer sc.mu.Unlock() return len(sc.subs) } func (s *scopeSub) Unsubscribe() { s.s.Unsubscribe() s.sc.mu.Lock() defer s.sc.mu.Unlock() delete(s.sc.subs, s) } func (s *scopeSub) Err() <-chan error { return s.s.Err() }