diff --git a/event/feed.go b/event/feed.go index 4568304df5..b1b597f17b 100644 --- a/event/feed.go +++ b/event/feed.go @@ -33,9 +33,8 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable // // The zero value is ready to use. type Feed struct { - // sendLock has a one-element buffer and is empty when held. - // It protects sendCases. - sendLock chan struct{} + once sync.Once // ensures that init only runs once + sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases. removeSub chan interface{} // interrupts Send sendCases caseList // the active set of select cases used by Send @@ -60,9 +59,6 @@ func (e feedTypeError) Error() string { } func (f *Feed) init() { - if f.sendLock != nil { - return - } f.removeSub = make(chan interface{}) f.sendLock = make(chan struct{}, 1) f.sendLock <- struct{}{} @@ -75,6 +71,8 @@ func (f *Feed) init() { // The channel should have ample buffer space to avoid blocking other subscribers. // Slow subscribers are not dropped. func (f *Feed) Subscribe(channel interface{}) Subscription { + f.once.Do(f.init) + chanval := reflect.ValueOf(channel) chantyp := chanval.Type() if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { @@ -84,7 +82,6 @@ func (f *Feed) Subscribe(channel interface{}) Subscription { f.mu.Lock() defer f.mu.Unlock() - f.init() if !f.typecheck(chantyp.Elem()) { panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) } @@ -130,10 +127,7 @@ func (f *Feed) remove(sub *feedSub) { // Send delivers to all subscribed channels simultaneously. // It returns the number of subscribers that the value was sent to. func (f *Feed) Send(value interface{}) (nsent int) { - f.mu.Lock() - f.init() - f.mu.Unlock() - + f.once.Do(f.init) <-f.sendLock // Add new cases from the inbox after taking the send lock.