go-ethereum/core/txpool/txpool.go

500 lines
17 KiB
Go

// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
import (
"errors"
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
// TxStatus is the current status of a transaction as seen by the pool.
type TxStatus uint
const (
TxStatusUnknown TxStatus = iota
TxStatusQueued
TxStatusPending
TxStatusIncluded
)
var (
// reservationsGaugeName is the prefix of a per-subpool address reservation
// metric.
//
// This is mostly a sanity metric to ensure there's no bug that would make
// some subpool hog all the reservations due to mis-accounting.
reservationsGaugeName = "txpool/reservations"
)
// BlockChain defines the minimal set of methods needed to back a tx pool with
// a chain. Exists to allow mocking the live chain out of tests.
type BlockChain interface {
// CurrentBlock returns the current head of the chain.
CurrentBlock() *types.Header
// SubscribeChainHeadEvent subscribes to new blocks being added to the chain.
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
}
// TxPool is an aggregator for various transaction specific pools, collectively
// tracking all the transactions deemed interesting by the node. Transactions
// enter the pool when they are received from the network or submitted locally.
// They exit the pool when they are included in the blockchain or evicted due to
// resource constraints.
type TxPool struct {
subpools []SubPool // List of subpools for specialized transaction handling
reservations map[common.Address]SubPool // Map with the account to pool reservations
reserveLock sync.Mutex // Lock protecting the account reservations
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool
sync chan chan error // Testing / simulator channel to block until internal reset is done
}
// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
// Retrieve the current head so that all subpools and this main coordinator
// pool will have the same starting state, even if the chain moves forward
// during initialization.
head := chain.CurrentBlock()
pool := &TxPool{
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
for j := i - 1; j >= 0; j-- {
subpools[j].Close()
}
return nil, err
}
}
go pool.loop(head, chain)
return pool, nil
}
// reserver is a method to create an address reservation callback to exclusively
// assign/deassign addresses to/from subpools. This can ensure that at any point
// in time, only a single subpool is able to manage an account, avoiding cross
// subpool eviction issues and nonce conflicts.
func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return func(addr common.Address, reserve bool) error {
p.reserveLock.Lock()
defer p.reserveLock.Unlock()
owner, exists := p.reservations[addr]
if reserve {
// Double reservations are forbidden even from the same pool to
// avoid subtle bugs in the long term.
if exists {
if owner == subpool {
log.Error("pool attempted to reserve already-owned address", "address", addr)
return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed
}
return ErrAlreadyReserved
}
p.reservations[addr] = subpool
if metrics.Enabled {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Inc(1)
}
return nil
}
// Ensure subpools only attempt to unreserve their own owned addresses,
// otherwise flag as a programming error.
if !exists {
log.Error("pool attempted to unreserve non-reserved address", "address", addr)
return errors.New("address not reserved")
}
if subpool != owner {
log.Error("pool attempted to unreserve non-owned address", "address", addr)
return errors.New("address not owned")
}
delete(p.reservations, addr)
if metrics.Enabled {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Dec(1)
}
return nil
}
}
// Close terminates the transaction pool and all its subpools.
func (p *TxPool) Close() error {
var errs []error
// Terminate the reset loop and wait for it to finish
errc := make(chan error)
p.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
}
// Terminate each subpool
for _, subpool := range p.subpools {
if err := subpool.Close(); err != nil {
errs = append(errs, err)
}
}
// Unsubscribe anyone still listening for tx events
p.subs.Close()
if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs)
}
return nil
}
// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (p *TxPool) loop(head *types.Header, chain BlockChain) {
// Close the termination marker when the pool stops
defer close(p.term)
// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
newHeadSub = chain.SubscribeChainHeadEvent(newHeadCh)
)
defer newHeadSub.Unsubscribe()
// Track the previous and current head to feed to an idle reset
var (
oldHead = head
newHead = oldHead
)
// Consume chain head events and start resets when none is running
var (
resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently
resetDone = make(chan *types.Header)
resetForced bool // Whether a forced reset was requested, only used in simulator mode
resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode
)
// Notify the live reset waiter to not block if the txpool is closed.
defer func() {
if resetWaiter != nil {
resetWaiter <- errors.New("pool already terminated")
resetWaiter = nil
}
}()
var errc chan error
for errc == nil {
// Something interesting might have happened, run a reset if there is
// one needed but none is running. The resetter will run on its own
// goroutine to allow chain head events to be consumed contiguously.
if newHead != oldHead || resetForced {
// Try to inject a busy marker and start a reset if successful
select {
case resetBusy <- struct{}{}:
// Busy marker injected, start a new subpool reset
go func(oldHead, newHead *types.Header) {
for _, subpool := range p.subpools {
subpool.Reset(oldHead, newHead)
}
resetDone <- newHead
}(oldHead, newHead)
// If the reset operation was explicitly requested, consider it
// being fulfilled and drop the request marker. If it was not,
// this is a noop.
resetForced = false
default:
// Reset already running, wait until it finishes.
//
// Note, this will not drop any forced reset request. If a forced
// reset was requested, but we were busy, then when the currently
// running reset finishes, a new one will be spun up.
}
}
// Wait for the next chain head event or a previous reset finish
select {
case event := <-newHeadCh:
// Chain moved forward, store the head for later consumption
newHead = event.Header
case head := <-resetDone:
// Previous reset finished, update the old head and allow a new reset
oldHead = head
<-resetBusy
// If someone is waiting for a reset to finish, notify them, unless
// the forced op is still pending. In that case, wait another round
// of resets.
if resetWaiter != nil && !resetForced {
resetWaiter <- nil
resetWaiter = nil
}
case errc = <-p.quit:
// Termination requested, break out on the next loop round
case syncc := <-p.sync:
// Transaction pool is running inside a simulator, and we are about
// to create a new block. Request a forced sync operation to ensure
// that any running reset operation finishes to make block imports
// deterministic. On top of that, run a new reset operation to make
// transaction insertions deterministic instead of being stuck in a
// queue waiting for a reset.
resetForced = true
resetWaiter = syncc
}
}
// Notify the closer of termination (no error possible for now)
errc <- nil
}
// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (p *TxPool) SetGasTip(tip *big.Int) {
for _, subpool := range p.subpools {
subpool.SetGasTip(tip)
}
}
// Has returns an indicator whether the pool has a transaction cached with the
// given hash.
func (p *TxPool) Has(hash common.Hash) bool {
for _, subpool := range p.subpools {
if subpool.Has(hash) {
return true
}
}
return false
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *TxPool) Get(hash common.Hash) *types.Transaction {
for _, subpool := range p.subpools {
if tx := subpool.Get(hash); tx != nil {
return tx
}
}
return nil
}
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {
for _, subpool := range p.subpools {
// It's an ugly to assume that only one pool will be capable of returning
// anything meaningful for this call, but anythingh else requires merging
// partial responses and that's too annoying to do until we get a second
// blobpool (probably never).
if blobs, proofs := subpool.GetBlobs(vhashes); blobs != nil {
return blobs, proofs
}
}
return nil, nil
}
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// Split the input transactions between the subpools. It shouldn't really
// happen that we receive merged batches, but better graceful than strange
// errors.
//
// We also need to track how the transactions were split across the subpools,
// so we can piece back the returned errors into the original order.
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
for i, tx := range txs {
// Mark this transaction belonging to no-subpool
splits[i] = -1
// Try to find a subpool that accepts the transaction
for j, subpool := range p.subpools {
if subpool.Filter(tx) {
txsets[j] = append(txsets[j], tx)
splits[i] = j
break
}
}
}
// Add the transactions split apart to the individual subpools and piece
// back the errors into the original sort order.
errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ {
errsets[i] = p.subpools[i].Add(txsets[i], local, sync)
}
errs := make([]error, len(txs))
for i, split := range splits {
// If the transaction was rejected by all subpools, mark it unsupported
if split == -1 {
errs[i] = core.ErrTxTypeNotSupported
continue
}
// Find which subpool handled it and pull in the corresponding error
errs[i] = errsets[split][0]
errsets[split] = errsets[split][1:]
}
return errs
}
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
}
}
return txs
}
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *TxPool) Nonce(addr common.Address) uint64 {
// Since (for now) accounts are unique to subpools, only one pool will have
// (at max) a non-state nonce. To avoid stateful lookups, just return the
// highest nonce for now.
var nonce uint64
for _, subpool := range p.subpools {
if next := subpool.Nonce(addr); nonce < next {
nonce = next
}
}
return nonce
}
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (p *TxPool) Stats() (int, int) {
var runnable, blocked int
for _, subpool := range p.subpools {
run, block := subpool.Stats()
runnable += run
blocked += block
}
return runnable, blocked
}
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (p *TxPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
var (
runnable = make(map[common.Address][]*types.Transaction)
blocked = make(map[common.Address][]*types.Transaction)
)
for _, subpool := range p.subpools {
run, block := subpool.Content()
for addr, txs := range run {
runnable[addr] = txs
}
for addr, txs := range block {
blocked[addr] = txs
}
}
return runnable, blocked
}
// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
for _, subpool := range p.subpools {
run, block := subpool.ContentFrom(addr)
if len(run) != 0 || len(block) != 0 {
return run, block
}
}
return []*types.Transaction{}, []*types.Transaction{}
}
// Locals retrieves the accounts currently considered local by the pool.
func (p *TxPool) Locals() []common.Address {
// Retrieve the locals from each subpool and deduplicate them
locals := make(map[common.Address]struct{})
for _, subpool := range p.subpools {
for _, local := range subpool.Locals() {
locals[local] = struct{}{}
}
}
// Flatten and return the deduplicated local set
flat := make([]common.Address, 0, len(locals))
for local := range locals {
flat = append(flat, local)
}
return flat
}
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by its hash.
func (p *TxPool) Status(hash common.Hash) TxStatus {
for _, subpool := range p.subpools {
if status := subpool.Status(hash); status != TxStatusUnknown {
return status
}
}
return TxStatusUnknown
}
// Sync is a helper method for unit tests or simulator runs where the chain events
// are arriving in quick succession, without any time in between them to run the
// internal background reset operations. This method will run an explicit reset
// operation to ensure the pool stabilises, thus avoiding flakey behavior.
//
// Note, do not use this in production / live code. In live code, the pool is
// meant to reset on a separate thread to avoid DoS vectors.
func (p *TxPool) Sync() error {
sync := make(chan error)
select {
case p.sync <- sync:
return <-sync
case <-p.term:
return errors.New("pool already terminated")
}
}