go-ethereum/core/bloombits/matcher.go
Bin 0bbd88bda0
all: use timer instead of time.After in loops, to avoid memleaks (#29241)
time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed.
2024-04-09 08:51:54 +02:00

650 lines
20 KiB
Go

// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"context"
"errors"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/crypto"
)
// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)
var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}
// partialMatches with a non-nil vector represents a section in which some sub-
// matchers have already found potential matches. Subsequent sub-matchers will
// binary AND their matches with this vector. If vector is nil, it represents a
// section to be processed by the first sub-matcher.
type partialMatches struct {
section uint64
bitset []byte
}
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The context and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Context context.Context
Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
// binary AND/OR operations on the bit-streams, creating a stream of potential
// blocks to inspect for data content.
type Matcher struct {
sectionSize uint64 // Size of the data batches to filter on
filters [][]bloomIndexes // Filter the system is matching for
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
retrievers chan chan uint // Retriever processes waiting for bit allocations
counters chan chan uint // Retriever processes waiting for task count reports
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
running atomic.Bool // Atomic flag whether a session is live or not
}
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
// address and topic filtering on them. Setting a filter component to `nil` is
// allowed and will result in that filter rule being skipped (OR 0x11...1).
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
// Create the matcher instance
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
retrievers: make(chan chan uint),
counters: make(chan chan uint),
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
// Calculate the bloom bit indexes for the groups we're interested in
m.filters = nil
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomBits != nil {
m.filters = append(m.filters, bloomBits)
}
}
// For every bit, create a scheduler to load/download the bit vectors
for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
return m
}
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
// it has not existed before. If the bit is already selected for filtering, the
// existing scheduler can be used.
func (m *Matcher) addScheduler(idx uint) {
if _, ok := m.schedulers[idx]; ok {
return
}
m.schedulers[idx] = newScheduler(idx)
}
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer m.running.Store(false)
// Initiate a new matching round
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
}
sink := m.run(begin, end, cap(results), session)
// Read the output from the result sink and deliver to the user
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case res, ok := <-sink:
// New match result found
if !ok {
return
}
// Calculate the first and last blocks of the section
sectionStart := res.section * m.sectionSize
first := sectionStart
if begin > first {
first = begin
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
}
// Iterate over all the blocks in the section and return the matching ones
for i := first; i <= last; i++ {
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
next := res.bitset[(i-sectionStart)/8]
if next == 0 {
if i%8 == 0 {
i += 7
}
continue
}
// Some bit it set, do the actual submatching
if bit := 7 - i%8; next&(1<<bit) != 0 {
select {
case <-session.quit:
return
case results <- i:
}
}
}
}
}
}()
return session, nil
}
// run creates a daisy-chain of sub-matchers, one for the address set and one
// for each topic set, each sub-matcher receiving a section only if the previous
// ones have all found a potential match in one of the blocks of the section,
// then binary AND-ing its own matches and forwarding the result to the next one.
//
// The method starts feeding the section indexes into the first sub-matcher on a
// new goroutine and returns a sink channel receiving the results.
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
// Create the source channel and feed section indexes into
source := make(chan *partialMatches, buffer)
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(source)
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
select {
case <-session.quit:
return
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
}()
// Assemble the daisy-chained filtering pipeline
next := source
dist := make(chan *request, buffer)
for _, bloom := range m.filters {
next = m.subMatch(next, dist, bloom, session)
}
// Start the request distribution
session.pend.Add(1)
go m.distributor(dist, session)
return next
}
// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
// that address/topic, and binary AND-ing those vectors together.
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
// Start the concurrent schedulers for each bit required by the bloom filter
sectionSources := make([][3]chan uint64, len(bloom))
sectionSinks := make([][3]chan []byte, len(bloom))
for i, bits := range bloom {
for j, bit := range bits {
sectionSources[i][j] = make(chan uint64, cap(source))
sectionSinks[i][j] = make(chan []byte, cap(source))
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
}
}
process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
results := make(chan *partialMatches, cap(source))
session.pend.Add(2)
go func() {
// Tear down the goroutine and terminate all source channels
defer session.pend.Done()
defer close(process)
defer func() {
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
close(bitSource)
}
}
}()
// Read sections from the source channel and multiplex into all bit-schedulers
for {
select {
case <-session.quit:
return
case subres, ok := <-source:
// New subresult from previous link
if !ok {
return
}
// Multiplex the section index to all bit-schedulers
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
select {
case <-session.quit:
return
case bitSource <- subres.section:
}
}
}
// Notify the processor that this section will become available
select {
case <-session.quit:
return
case process <- subres:
}
}
}
}()
go func() {
// Tear down the goroutine and terminate the final sink channel
defer session.pend.Done()
defer close(results)
// Read the source notifications and collect the delivered results
for {
select {
case <-session.quit:
return
case subres, ok := <-process:
// Notified of a section being retrieved
if !ok {
return
}
// Gather all the sub-results and merge them together
var orVector []byte
for _, bloomSinks := range sectionSinks {
var andVector []byte
for _, bitSink := range bloomSinks {
var data []byte
select {
case <-session.quit:
return
case data = <-bitSink:
}
if andVector == nil {
andVector = make([]byte, int(m.sectionSize/8))
copy(andVector, data)
} else {
bitutil.ANDBytes(andVector, andVector, data)
}
}
if orVector == nil {
orVector = andVector
} else {
bitutil.ORBytes(orVector, orVector, andVector)
}
}
if orVector == nil {
orVector = make([]byte, int(m.sectionSize/8))
}
if subres.bitset != nil {
bitutil.ANDBytes(orVector, orVector, subres.bitset)
}
if bitutil.TestBytes(orVector) {
select {
case <-session.quit:
return
case results <- &partialMatches{subres.section, orVector}:
}
}
}
}
}()
return results
}
// distributor receives requests from the schedulers and queues them into a set
// of pending requests, which are assigned to retrievers wanting to fulfil them.
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
defer session.pend.Done()
var (
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
)
// assign is a helper method to try to assign a pending bit an actively
// listening servicer, or schedule it up for later when one arrives.
assign := func(bit uint) {
select {
case fetcher := <-m.retrievers:
allocs++
fetcher <- bit
default:
// No retrievers active, start listening for new ones
retrievers = m.retrievers
unallocs[bit] = struct{}{}
}
}
for {
select {
case <-shutdown:
// Shutdown requested. No more retrievers can be allocated,
// but we still need to wait until all pending requests have returned.
shutdown = nil
if allocs == 0 {
return
}
case req := <-dist:
// New retrieval request arrived to be distributed to some fetcher process
queue := requests[req.bit]
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
// If it's a new bit and we have waiting fetchers, allocate to them
if len(queue) == 0 {
assign(req.bit)
}
case fetcher := <-retrievers:
// New retriever arrived, find the lowest section-ed bit to assign
bit, best := uint(0), uint64(math.MaxUint64)
for idx := range unallocs {
if requests[idx][0] < best {
bit, best = idx, requests[idx][0]
}
}
// Stop tracking this bit (and alloc notifications if no more work is available)
delete(unallocs, bit)
if len(unallocs) == 0 {
retrievers = nil
}
allocs++
fetcher <- bit
case fetcher := <-m.counters:
// New task count request arrives, return number of items
fetcher <- uint(len(requests[<-fetcher]))
case fetcher := <-m.retrievals:
// New fetcher waiting for tasks to retrieve, assign
task := <-fetcher
if want := len(task.Sections); want >= len(requests[task.Bit]) {
task.Sections = requests[task.Bit]
delete(requests, task.Bit)
} else {
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
}
fetcher <- task
// If anything was left unallocated, try to assign to someone else
if len(requests[task.Bit]) > 0 {
assign(task.Bit)
}
case result := <-m.deliveries:
// New retrieval task response from fetcher, split out missing sections and
// deliver complete ones
var (
sections = make([]uint64, 0, len(result.Sections))
bitsets = make([][]byte, 0, len(result.Bitsets))
missing = make([]uint64, 0, len(result.Sections))
)
for i, bitset := range result.Bitsets {
if len(bitset) == 0 {
missing = append(missing, result.Sections[i])
continue
}
sections = append(sections, result.Sections[i])
bitsets = append(bitsets, bitset)
}
m.schedulers[result.Bit].deliver(sections, bitsets)
allocs--
// Reschedule missing sections and allocate bit if newly available
if len(missing) > 0 {
queue := requests[result.Bit]
for _, section := range missing {
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
}
requests[result.Bit] = queue
if len(queue) == len(missing) {
assign(result.Bit)
}
}
// End the session when all pending deliveries have arrived.
if shutdown == nil && allocs == 0 {
return
}
}
}
}
// MatcherSession is returned by a started matcher to be used as a terminator
// for the actively running matching operation.
type MatcherSession struct {
matcher *Matcher
closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
ctx context.Context // Context used by the light client to abort filtering
err error // Global error to track retrieval failures deep in the chain
errLock sync.Mutex
pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
s.pend.Wait()
})
}
// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
s.errLock.Lock()
defer s.errLock.Unlock()
return s.err
}
// allocateRetrieval assigns a bloom bit index to a client process that can either
// immediately request and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
func (s *MatcherSession) allocateRetrieval() (uint, bool) {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0, false
case s.matcher.retrievers <- fetcher:
bit, ok := <-fetcher
return bit, ok
}
}
// pendingSections returns the number of pending section retrievals belonging to
// the given bloom bit index.
func (s *MatcherSession) pendingSections(bit uint) int {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0
case s.matcher.counters <- fetcher:
fetcher <- bit
return int(<-fetcher)
}
}
// allocateSections assigns all or part of an already allocated bit-task queue
// to the requesting process.
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
fetcher := make(chan *Retrieval)
select {
case <-s.quit:
return nil
case s.matcher.retrievals <- fetcher:
task := &Retrieval{
Bit: bit,
Sections: make([]uint64, count),
}
fetcher <- task
return (<-fetcher).Sections
}
}
// deliverSections delivers a batch of section bit-vectors for a specific bloom
// bit index to be injected into the processing pipeline.
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
}
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
// the requested retrieval queue to be serviced together with other sessions.
//
// This method will block for the lifetime of the session. Even after termination
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()
for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
if !ok {
return
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
// Allocate as much as we can handle and request servicing
sections := s.allocateSections(bit, batch)
request := make(chan *Retrieval)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.deliverSections(bit, sections, make([][]byte, len(sections)))
return
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
// Deliver a result before s.Close() to avoid a deadlock
s.deliverSections(result.Bit, result.Sections, result.Bitsets)
if result.Error != nil {
s.errLock.Lock()
s.err = result.Error
s.errLock.Unlock()
s.Close()
}
}
}
}