diff --git a/event/feedof.go b/event/feedof.go
new file mode 100644
index 0000000000..598038a19e
--- /dev/null
+++ b/event/feedof.go
@@ -0,0 +1,167 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+//go:build go1.18
+// +build go1.18
+
+package event
+
+import (
+ "reflect"
+ "sync"
+)
+
+// FeedOf implements one-to-many subscriptions where the carrier of events is a channel.
+// Values sent to a Feed are delivered to all subscribed channels simultaneously.
+//
+// The zero value is ready to use.
+type FeedOf[T any] struct {
+ once sync.Once // ensures that init only runs once
+ sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
+ removeSub chan chan<- T // interrupts Send
+ sendCases caseList // the active set of select cases used by Send
+
+ // The inbox holds newly subscribed channels until they are added to sendCases.
+ mu sync.Mutex
+ inbox caseList
+}
+
+func (f *FeedOf[T]) init() {
+ f.removeSub = make(chan chan<- T)
+ f.sendLock = make(chan struct{}, 1)
+ f.sendLock <- struct{}{}
+ f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
+}
+
+// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
+// until the subscription is canceled.
+//
+// The channel should have ample buffer space to avoid blocking other subscribers. Slow
+// subscribers are not dropped.
+func (f *FeedOf[T]) Subscribe(channel chan<- T) Subscription {
+ f.once.Do(f.init)
+
+ chanval := reflect.ValueOf(channel)
+ sub := &feedOfSub[T]{feed: f, channel: channel, err: make(chan error, 1)}
+
+ // Add the select case to the inbox.
+ // The next Send will add it to f.sendCases.
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
+ f.inbox = append(f.inbox, cas)
+ return sub
+}
+
+func (f *FeedOf[T]) remove(sub *feedOfSub[T]) {
+ // Delete from inbox first, which covers channels
+ // that have not been added to f.sendCases yet.
+ f.mu.Lock()
+ index := f.inbox.find(sub.channel)
+ if index != -1 {
+ f.inbox = f.inbox.delete(index)
+ f.mu.Unlock()
+ return
+ }
+ f.mu.Unlock()
+
+ select {
+ case f.removeSub <- sub.channel:
+ // Send will remove the channel from f.sendCases.
+ case <-f.sendLock:
+ // No Send is in progress, delete the channel now that we have the send lock.
+ f.sendCases = f.sendCases.delete(f.sendCases.find(sub.channel))
+ f.sendLock <- struct{}{}
+ }
+}
+
+// Send delivers to all subscribed channels simultaneously.
+// It returns the number of subscribers that the value was sent to.
+func (f *FeedOf[T]) Send(value T) (nsent int) {
+ rvalue := reflect.ValueOf(value)
+
+ f.once.Do(f.init)
+ <-f.sendLock
+
+ // Add new cases from the inbox after taking the send lock.
+ f.mu.Lock()
+ f.sendCases = append(f.sendCases, f.inbox...)
+ f.inbox = nil
+ f.mu.Unlock()
+
+ // Set the sent value on all channels.
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
+ f.sendCases[i].Send = rvalue
+ }
+
+ // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
+ // of sendCases. When a send succeeds, the corresponding case moves to the end of
+ // 'cases' and it shrinks by one element.
+ cases := f.sendCases
+ for {
+ // Fast path: try sending without blocking before adding to the select set.
+ // This should usually succeed if subscribers are fast enough and have free
+ // buffer space.
+ for i := firstSubSendCase; i < len(cases); i++ {
+ if cases[i].Chan.TrySend(rvalue) {
+ nsent++
+ cases = cases.deactivate(i)
+ i--
+ }
+ }
+ if len(cases) == firstSubSendCase {
+ break
+ }
+ // Select on all the receivers, waiting for them to unblock.
+ chosen, recv, _ := reflect.Select(cases)
+ if chosen == 0 /* <-f.removeSub */ {
+ index := f.sendCases.find(recv.Interface())
+ f.sendCases = f.sendCases.delete(index)
+ if index >= 0 && index < len(cases) {
+ // Shrink 'cases' too because the removed case was still active.
+ cases = f.sendCases[:len(cases)-1]
+ }
+ } else {
+ cases = cases.deactivate(chosen)
+ nsent++
+ }
+ }
+
+ // Forget about the sent value and hand off the send lock.
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
+ f.sendCases[i].Send = reflect.Value{}
+ }
+ f.sendLock <- struct{}{}
+ return nsent
+}
+
+type feedOfSub[T any] struct {
+ feed *FeedOf[T]
+ channel chan<- T
+ errOnce sync.Once
+ err chan error
+}
+
+func (sub *feedOfSub[T]) Unsubscribe() {
+ sub.errOnce.Do(func() {
+ sub.feed.remove(sub)
+ close(sub.err)
+ })
+}
+
+func (sub *feedOfSub[T]) Err() <-chan error {
+ return sub.err
+}
diff --git a/event/feedof_test.go b/event/feedof_test.go
new file mode 100644
index 0000000000..8478eeffb1
--- /dev/null
+++ b/event/feedof_test.go
@@ -0,0 +1,282 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+//go:build go1.18
+// +build go1.18
+
+package event
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestFeedOf(t *testing.T) {
+ var feed FeedOf[int]
+ var done, subscribed sync.WaitGroup
+ subscriber := func(i int) {
+ defer done.Done()
+
+ subchan := make(chan int)
+ sub := feed.Subscribe(subchan)
+ timeout := time.NewTimer(2 * time.Second)
+ defer timeout.Stop()
+ subscribed.Done()
+
+ select {
+ case v := <-subchan:
+ if v != 1 {
+ t.Errorf("%d: received value %d, want 1", i, v)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: receive timeout", i)
+ }
+
+ sub.Unsubscribe()
+ select {
+ case _, ok := <-sub.Err():
+ if ok {
+ t.Errorf("%d: error channel not closed after unsubscribe", i)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: unsubscribe timeout", i)
+ }
+ }
+
+ const n = 1000
+ done.Add(n)
+ subscribed.Add(n)
+ for i := 0; i < n; i++ {
+ go subscriber(i)
+ }
+ subscribed.Wait()
+ if nsent := feed.Send(1); nsent != n {
+ t.Errorf("first send delivered %d times, want %d", nsent, n)
+ }
+ if nsent := feed.Send(2); nsent != 0 {
+ t.Errorf("second send delivered %d times, want 0", nsent)
+ }
+ done.Wait()
+}
+
+func TestFeedOfSubscribeSameChannel(t *testing.T) {
+ var (
+ feed FeedOf[int]
+ done sync.WaitGroup
+ ch = make(chan int)
+ sub1 = feed.Subscribe(ch)
+ sub2 = feed.Subscribe(ch)
+ _ = feed.Subscribe(ch)
+ )
+ expectSends := func(value, n int) {
+ if nsent := feed.Send(value); nsent != n {
+ t.Errorf("send delivered %d times, want %d", nsent, n)
+ }
+ done.Done()
+ }
+ expectRecv := func(wantValue, n int) {
+ for i := 0; i < n; i++ {
+ if v := <-ch; v != wantValue {
+ t.Errorf("received %d, want %d", v, wantValue)
+ }
+ }
+ }
+
+ done.Add(1)
+ go expectSends(1, 3)
+ expectRecv(1, 3)
+ done.Wait()
+
+ sub1.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(2, 2)
+ expectRecv(2, 2)
+ done.Wait()
+
+ sub2.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(3, 1)
+ expectRecv(3, 1)
+ done.Wait()
+}
+
+func TestFeedOfSubscribeBlockedPost(t *testing.T) {
+ var (
+ feed FeedOf[int]
+ nsends = 2000
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ wg sync.WaitGroup
+ )
+ defer wg.Wait()
+
+ feed.Subscribe(ch1)
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+
+ sub2 := feed.Subscribe(ch2)
+ defer sub2.Unsubscribe()
+
+ // We're done when ch1 has received N times.
+ // The number of receives on ch2 depends on scheduling.
+ for i := 0; i < nsends; {
+ select {
+ case <-ch1:
+ i++
+ case <-ch2:
+ }
+ }
+}
+
+func TestFeedOfUnsubscribeBlockedPost(t *testing.T) {
+ var (
+ feed FeedOf[int]
+ nsends = 200
+ chans = make([]chan int, 2000)
+ subs = make([]Subscription, len(chans))
+ bchan = make(chan int)
+ bsub = feed.Subscribe(bchan)
+ wg sync.WaitGroup
+ )
+ for i := range chans {
+ chans[i] = make(chan int, nsends)
+ }
+
+ // Queue up some Sends. None of these can make progress while bchan isn't read.
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+ // Subscribe the other channels.
+ for i, ch := range chans {
+ subs[i] = feed.Subscribe(ch)
+ }
+ // Unsubscribe them again.
+ for _, sub := range subs {
+ sub.Unsubscribe()
+ }
+ // Unblock the Sends.
+ bsub.Unsubscribe()
+ wg.Wait()
+}
+
+// Checks that unsubscribing a channel during Send works even if that
+// channel has already been sent on.
+func TestFeedOfUnsubscribeSentChan(t *testing.T) {
+ var (
+ feed FeedOf[int]
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ sub1 = feed.Subscribe(ch1)
+ sub2 = feed.Subscribe(ch2)
+ wg sync.WaitGroup
+ )
+ defer sub2.Unsubscribe()
+
+ wg.Add(1)
+ go func() {
+ feed.Send(0)
+ wg.Done()
+ }()
+
+ // Wait for the value on ch1.
+ <-ch1
+ // Unsubscribe ch1, removing it from the send cases.
+ sub1.Unsubscribe()
+
+ // Receive ch2, finishing Send.
+ <-ch2
+ wg.Wait()
+
+ // Send again. This should send to ch2 only, so the wait group will unblock
+ // as soon as a value is received on ch2.
+ wg.Add(1)
+ go func() {
+ feed.Send(0)
+ wg.Done()
+ }()
+ <-ch2
+ wg.Wait()
+}
+
+func TestFeedOfUnsubscribeFromInbox(t *testing.T) {
+ var (
+ feed FeedOf[int]
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ sub1 = feed.Subscribe(ch1)
+ sub2 = feed.Subscribe(ch1)
+ sub3 = feed.Subscribe(ch2)
+ )
+ if len(feed.inbox) != 3 {
+ t.Errorf("inbox length != 3 after subscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+
+ sub1.Unsubscribe()
+ sub2.Unsubscribe()
+ sub3.Unsubscribe()
+ if len(feed.inbox) != 0 {
+ t.Errorf("inbox is non-empty after unsubscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+}
+
+func BenchmarkFeedOfSend1000(b *testing.B) {
+ var (
+ done sync.WaitGroup
+ feed FeedOf[int]
+ nsubs = 1000
+ )
+ subscriber := func(ch <-chan int) {
+ for i := 0; i < b.N; i++ {
+ <-ch
+ }
+ done.Done()
+ }
+ done.Add(nsubs)
+ for i := 0; i < nsubs; i++ {
+ ch := make(chan int, 200)
+ feed.Subscribe(ch)
+ go subscriber(ch)
+ }
+
+ // The actual benchmark.
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if feed.Send(i) != nsubs {
+ panic("wrong number of sends")
+ }
+ }
+
+ b.StopTimer()
+ done.Wait()
+}