diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 3ec0d5ae9..927232be0 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -155,7 +155,6 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin session := &MatcherSession{ matcher: m, quit: make(chan struct{}), - kill: make(chan struct{}), ctx: ctx, } for _, scheduler := range m.schedulers { @@ -386,10 +385,8 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty) - ) - var ( - allocs int // Number of active allocations to handle graceful shutdown requests - shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests + allocs int // Number of active allocations to handle graceful shutdown requests + shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests ) // assign is a helper method fo try to assign a pending bit an actively @@ -409,15 +406,12 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { for { select { case <-shutdown: - // Graceful shutdown requested, wait until all pending requests are honoured + // Shutdown requested. No more retrievers can be allocated, + // but we still need to wait until all pending requests have returned. + shutdown = nil if allocs == 0 { return } - shutdown = nil - - case <-session.kill: - // Pending requests not honoured in time, hard terminate - return case req := <-dist: // New retrieval request arrived to be distributed to some fetcher process @@ -499,8 +493,9 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { assign(result.Bit) } } - // If we're in the process of shutting down, terminate - if allocs == 0 && shutdown == nil { + + // End the session when all pending deliveries have arrived. + if shutdown == nil && allocs == 0 { return } } @@ -514,7 +509,6 @@ type MatcherSession struct { closer sync.Once // Sync object to ensure we only ever close once quit chan struct{} // Quit channel to request pipeline termination - kill chan struct{} // Term channel to signal non-graceful forced shutdown ctx context.Context // Context used by the light client to abort filtering err atomic.Value // Global error to track retrieval failures deep in the chain @@ -529,7 +523,6 @@ func (s *MatcherSession) Close() { s.closer.Do(func() { // Signal termination and wait for all goroutines to tear down close(s.quit) - time.AfterFunc(time.Second, func() { close(s.kill) }) s.pend.Wait() }) } @@ -542,10 +535,10 @@ func (s *MatcherSession) Error() error { return nil } -// AllocateRetrieval assigns a bloom bit index to a client process that can either +// allocateRetrieval assigns a bloom bit index to a client process that can either // immediately request and fetch the section contents assigned to this bit or wait // a little while for more sections to be requested. -func (s *MatcherSession) AllocateRetrieval() (uint, bool) { +func (s *MatcherSession) allocateRetrieval() (uint, bool) { fetcher := make(chan uint) select { @@ -557,9 +550,9 @@ func (s *MatcherSession) AllocateRetrieval() (uint, bool) { } } -// PendingSections returns the number of pending section retrievals belonging to +// pendingSections returns the number of pending section retrievals belonging to // the given bloom bit index. -func (s *MatcherSession) PendingSections(bit uint) int { +func (s *MatcherSession) pendingSections(bit uint) int { fetcher := make(chan uint) select { @@ -571,9 +564,9 @@ func (s *MatcherSession) PendingSections(bit uint) int { } } -// AllocateSections assigns all or part of an already allocated bit-task queue +// allocateSections assigns all or part of an already allocated bit-task queue // to the requesting process. -func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 { +func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 { fetcher := make(chan *Retrieval) select { @@ -589,14 +582,10 @@ func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 { } } -// DeliverSections delivers a batch of section bit-vectors for a specific bloom +// deliverSections delivers a batch of section bit-vectors for a specific bloom // bit index to be injected into the processing pipeline. -func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) { - select { - case <-s.kill: - return - case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}: - } +func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) { + s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets} } // Multiplex polls the matcher session for retrieval tasks and multiplexes it into @@ -608,17 +597,17 @@ func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [] func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { for { // Allocate a new bloom bit index to retrieve data for, stopping when done - bit, ok := s.AllocateRetrieval() + bit, ok := s.allocateRetrieval() if !ok { return } // Bit allocated, throttle a bit if we're below our batch limit - if s.PendingSections(bit) < batch { + if s.pendingSections(bit) < batch { select { case <-s.quit: // Session terminating, we can't meaningfully service, abort - s.AllocateSections(bit, 0) - s.DeliverSections(bit, []uint64{}, [][]byte{}) + s.allocateSections(bit, 0) + s.deliverSections(bit, []uint64{}, [][]byte{}) return case <-time.After(wait): @@ -626,13 +615,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan } } // Allocate as much as we can handle and request servicing - sections := s.AllocateSections(bit, batch) + sections := s.allocateSections(bit, batch) request := make(chan *Retrieval) select { case <-s.quit: // Session terminating, we can't meaningfully service, abort - s.DeliverSections(bit, sections, make([][]byte, len(sections))) + s.deliverSections(bit, sections, make([][]byte, len(sections))) return case mux <- request: @@ -644,7 +633,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan s.err.Store(result.Error) s.Close() } - s.DeliverSections(result.Bit, result.Sections, result.Bitsets) + s.deliverSections(result.Bit, result.Sections, result.Bitsets) } } }