bb9897f11b
Here we move the method that drops all transactions by temporarily increasing the fee into the TxPool itself. It's better to have it there because we can set it back to the configured value afterwards. This resolves a TODO in the simulated backend.
1670 lines
63 KiB
Go
1670 lines
63 KiB
Go
// Copyright 2022 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package blobpool implements the EIP-4844 blob transaction pool.
|
|
package blobpool
|
|
|
|
import (
|
|
"container/heap"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
|
|
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
|
|
"github.com/ethereum/go-ethereum/core"
|
|
"github.com/ethereum/go-ethereum/core/state"
|
|
"github.com/ethereum/go-ethereum/core/txpool"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/params"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/holiman/billy"
|
|
"github.com/holiman/uint256"
|
|
)
|
|
|
|
const (
|
|
// blobSize is the protocol constrained byte size of a single blob in a
|
|
// transaction. There can be multiple of these embedded into a single tx.
|
|
blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement
|
|
|
|
// maxBlobsPerTransaction is the maximum number of blobs a single transaction
|
|
// is allowed to contain. Whilst the spec states it's unlimited, the block
|
|
// data slots are protocol bound, which implicitly also limit this.
|
|
maxBlobsPerTransaction = params.MaxBlobGasPerBlock / params.BlobTxBlobGasPerBlob
|
|
|
|
// txAvgSize is an approximate byte size of a transaction metadata to avoid
|
|
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
|
|
txAvgSize = 4 * 1024
|
|
|
|
// txMaxSize is the maximum size a single transaction can have, outside
|
|
// the included blobs. Since blob transactions are pulled instead of pushed,
|
|
// and only a small metadata is kept in ram, the rest is on disk, there is
|
|
// no critical limit that should be enforced. Still, capping it to some sane
|
|
// limit can never hurt.
|
|
txMaxSize = 1024 * 1024
|
|
|
|
// maxTxsPerAccount is the maximum number of blob transactions admitted from
|
|
// a single account. The limit is enforced to minimize the DoS potential of
|
|
// a private tx cancelling publicly propagated blobs.
|
|
//
|
|
// Note, transactions resurrected by a reorg are also subject to this limit,
|
|
// so pushing it down too aggressively might make resurrections non-functional.
|
|
maxTxsPerAccount = 16
|
|
|
|
// pendingTransactionStore is the subfolder containing the currently queued
|
|
// blob transactions.
|
|
pendingTransactionStore = "queue"
|
|
|
|
// limboedTransactionStore is the subfolder containing the currently included
|
|
// but not yet finalized transaction blobs.
|
|
limboedTransactionStore = "limbo"
|
|
)
|
|
|
|
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
|
|
// schedule the blob transactions into the following blocks. Only ever add the
|
|
// bare minimum needed fields to keep the size down (and thus number of entries
|
|
// larger with the same memory consumption).
|
|
type blobTxMeta struct {
|
|
hash common.Hash // Transaction hash to maintain the lookup table
|
|
id uint64 // Storage ID in the pool's persistent store
|
|
size uint32 // Byte size in the pool's persistent store
|
|
|
|
nonce uint64 // Needed to prioritize inclusion order within an account
|
|
costCap *uint256.Int // Needed to validate cumulative balance sufficiency
|
|
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
|
|
execFeeCap *uint256.Int // Needed to validate replacement price bump
|
|
blobFeeCap *uint256.Int // Needed to validate replacement price bump
|
|
execGas uint64 // Needed to check inclusion validity before reading the blob
|
|
blobGas uint64 // Needed to check inclusion validity before reading the blob
|
|
|
|
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
|
|
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
|
|
|
|
evictionExecTip *uint256.Int // Worst gas tip across all previous nonces
|
|
evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces
|
|
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
|
}
|
|
|
|
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
|
// and assembles a helper struct to track in memory.
|
|
func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
|
|
meta := &blobTxMeta{
|
|
hash: tx.Hash(),
|
|
id: id,
|
|
size: size,
|
|
nonce: tx.Nonce(),
|
|
costCap: uint256.MustFromBig(tx.Cost()),
|
|
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
|
|
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
|
|
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
|
|
execGas: tx.Gas(),
|
|
blobGas: tx.BlobGas(),
|
|
}
|
|
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
|
|
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
|
|
|
|
return meta
|
|
}
|
|
|
|
// BlobPool is the transaction pool dedicated to EIP-4844 blob transactions.
|
|
//
|
|
// Blob transactions are special snowflakes that are designed for a very specific
|
|
// purpose (rollups) and are expected to adhere to that specific use case. These
|
|
// behavioural expectations allow us to design a transaction pool that is more robust
|
|
// (i.e. resending issues) and more resilient to DoS attacks (e.g. replace-flush
|
|
// attacks) than the generic tx pool. These improvements will also mean, however,
|
|
// that we enforce a significantly more aggressive strategy on entering and exiting
|
|
// the pool:
|
|
//
|
|
// - Blob transactions are large. With the initial design aiming for 128KB blobs,
|
|
// we must ensure that these only traverse the network the absolute minimum
|
|
// number of times. Broadcasting to sqrt(peers) is out of the question, rather
|
|
// these should only ever be announced and the remote side should request it if
|
|
// it wants to.
|
|
//
|
|
// - Block blob-space is limited. With blocks being capped to a few blob txs, we
|
|
// can make use of the very low expected churn rate within the pool. Notably,
|
|
// we should be able to use a persistent disk backend for the pool, solving
|
|
// the tx resend issue that plagues the generic tx pool, as long as there's no
|
|
// artificial churn (i.e. pool wars).
|
|
//
|
|
// - Purpose of blobs are layer-2s. Layer-2s are meant to use blob transactions to
|
|
// commit to their own current state, which is independent of Ethereum mainnet
|
|
// (state, txs). This means that there's no reason for blob tx cancellation or
|
|
// replacement, apart from a potential basefee / miner tip adjustment.
|
|
//
|
|
// - Replacements are expensive. Given their size, propagating a replacement
|
|
// blob transaction to an existing one should be aggressively discouraged.
|
|
// Whilst generic transactions can start at 1 Wei gas cost and require a 10%
|
|
// fee bump to replace, we suggest requiring a higher min cost (e.g. 1 gwei)
|
|
// and a more aggressive bump (100%).
|
|
//
|
|
// - Cancellation is prohibitive. Evicting an already propagated blob tx is a huge
|
|
// DoS vector. As such, a) replacement (higher-fee) blob txs mustn't invalidate
|
|
// already propagated (future) blob txs (cumulative fee); b) nonce-gapped blob
|
|
// txs are disallowed; c) the presence of blob transactions exclude non-blob
|
|
// transactions.
|
|
//
|
|
// - Malicious cancellations are possible. Although the pool might prevent txs
|
|
// that cancel blobs, blocks might contain such transaction (malicious miner
|
|
// or flashbotter). The pool should cap the total number of blob transactions
|
|
// per account as to prevent propagating too much data before cancelling it
|
|
// via a normal transaction. It should nonetheless be high enough to support
|
|
// resurrecting reorged transactions. Perhaps 4-16.
|
|
//
|
|
// - Local txs are meaningless. Mining pools historically used local transactions
|
|
// for payouts or for backdoor deals. With 1559 in place, the basefee usually
|
|
// dominates the final price, so 0 or non-0 tip doesn't change much. Blob txs
|
|
// retain the 1559 2D gas pricing (and introduce on top a dynamic blob gas fee),
|
|
// so locality is moot. With a disk backed blob pool avoiding the resend issue,
|
|
// there's also no need to save own transactions for later.
|
|
//
|
|
// - No-blob blob-txs are bad. Theoretically there's no strong reason to disallow
|
|
// blob txs containing 0 blobs. In practice, admitting such txs into the pool
|
|
// breaks the low-churn invariant as blob constraints don't apply anymore. Even
|
|
// though we could accept blocks containing such txs, a reorg would require moving
|
|
// them back into the blob pool, which can break invariants.
|
|
//
|
|
// - Dropping blobs needs delay. When normal transactions are included, they
|
|
// are immediately evicted from the pool since they are contained in the
|
|
// including block. Blobs however are not included in the execution chain,
|
|
// so a mini reorg cannot re-pool "lost" blob transactions. To support reorgs,
|
|
// blobs are retained on disk until they are finalised.
|
|
//
|
|
// - Blobs can arrive via flashbots. Blocks might contain blob transactions we
|
|
// have never seen on the network. Since we cannot recover them from blocks
|
|
// either, the engine_newPayload needs to give them to us, and we cache them
|
|
// until finality to support reorgs without tx losses.
|
|
//
|
|
// Whilst some constraints above might sound overly aggressive, the general idea is
|
|
// that the blob pool should work robustly for its intended use case and whilst
|
|
// anyone is free to use blob transactions for arbitrary non-rollup use cases,
|
|
// they should not be allowed to run amok the network.
|
|
//
|
|
// Implementation wise there are a few interesting design choices:
|
|
//
|
|
// - Adding a transaction to the pool blocks until persisted to disk. This is
|
|
// viable because TPS is low (2-4 blobs per block initially, maybe 8-16 at
|
|
// peak), so natural churn is a couple MB per block. Replacements doing O(n)
|
|
// updates are forbidden and transaction propagation is pull based (i.e. no
|
|
// pileup of pending data).
|
|
//
|
|
// - When transactions are chosen for inclusion, the primary criteria is the
|
|
// signer tip (and having a basefee/data fee high enough of course). However,
|
|
// same-tip transactions will be split by their basefee/datafee, preferring
|
|
// those that are closer to the current network limits. The idea being that
|
|
// very relaxed ones can be included even if the fees go up, when the closer
|
|
// ones could already be invalid.
|
|
//
|
|
// When the pool eventually reaches saturation, some old transactions - that may
|
|
// never execute - will need to be evicted in favor of newer ones. The eviction
|
|
// strategy is quite complex:
|
|
//
|
|
// - Exceeding capacity evicts the highest-nonce of the account with the lowest
|
|
// paying blob transaction anywhere in the pooled nonce-sequence, as that tx
|
|
// would be executed the furthest in the future and is thus blocking anything
|
|
// after it. The smallest is deliberately not evicted to avoid a nonce-gap.
|
|
//
|
|
// - Analogously, if the pool is full, the consideration price of a new tx for
|
|
// evicting an old one is the smallest price in the entire nonce-sequence of
|
|
// the account. This avoids malicious users DoSing the pool with seemingly
|
|
// high paying transactions hidden behind a low-paying blocked one.
|
|
//
|
|
// - Since blob transactions have 3 price parameters: execution tip, execution
|
|
// fee cap and data fee cap, there's no singular parameter to create a total
|
|
// price ordering on. What's more, since the base fee and blob fee can move
|
|
// independently of one another, there's no pre-defined way to combine them
|
|
// into a stable order either. This leads to a multi-dimensional problem to
|
|
// solve after every block.
|
|
//
|
|
// - The first observation is that comparing 1559 base fees or 4844 blob fees
|
|
// needs to happen in the context of their dynamism. Since these fees jump
|
|
// up or down in ~1.125 multipliers (at max) across blocks, comparing fees
|
|
// in two transactions should be based on log1.125(fee) to eliminate noise.
|
|
//
|
|
// - The second observation is that the basefee and blobfee move independently,
|
|
// so there's no way to split mixed txs on their own (A has higher base fee,
|
|
// B has higher blob fee). Rather than look at the absolute fees, the useful
|
|
// metric is the max time it can take to exceed the transaction's fee caps.
|
|
// Specifically, we're interested in the number of jumps needed to go from
|
|
// the current fee to the transaction's cap:
|
|
//
|
|
// jumps = log1.125(txfee) - log1.125(basefee)
|
|
//
|
|
// - The third observation is that the base fee tends to hover around rather
|
|
// than swing wildly. The number of jumps needed from the current fee starts
|
|
// to get less relevant the higher it is. To remove the noise here too, the
|
|
// pool will use log(jumps) as the delta for comparing transactions.
|
|
//
|
|
// delta = sign(jumps) * log(abs(jumps))
|
|
//
|
|
// - To establish a total order, we need to reduce the dimensionality of the
|
|
// two base fees (log jumps) to a single value. The interesting aspect from
|
|
// the pool's perspective is how fast will a tx get executable (fees going
|
|
// down, crossing the smaller negative jump counter) or non-executable (fees
|
|
// going up, crossing the smaller positive jump counter). As such, the pool
|
|
// cares only about the min of the two delta values for eviction priority.
|
|
//
|
|
// priority = min(deltaBasefee, deltaBlobfee)
|
|
//
|
|
// - The above very aggressive dimensionality and noise reduction should result
|
|
// in transaction being grouped into a small number of buckets, the further
|
|
// the fees the larger the buckets. This is good because it allows us to use
|
|
// the miner tip meaningfully as a splitter.
|
|
//
|
|
// - For the scenario where the pool does not contain non-executable blob txs
|
|
// anymore, it does not make sense to grant a later eviction priority to txs
|
|
// with high fee caps since it could enable pool wars. As such, any positive
|
|
// priority will be grouped together.
|
|
//
|
|
// priority = min(deltaBasefee, deltaBlobfee, 0)
|
|
//
|
|
// Optimisation tradeoffs:
|
|
//
|
|
// - Eviction relies on 3 fee minimums per account (exec tip, exec cap and blob
|
|
// cap). Maintaining these values across all transactions from the account is
|
|
// problematic as each transaction replacement or inclusion would require a
|
|
// rescan of all other transactions to recalculate the minimum. Instead, the
|
|
// pool maintains a rolling minimum across the nonce range. Updating all the
|
|
// minimums will need to be done only starting at the swapped in/out nonce
|
|
// and leading up to the first no-change.
|
|
type BlobPool struct {
|
|
config Config // Pool configuration
|
|
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
|
|
|
|
store billy.Database // Persistent data store for the tx metadata and blobs
|
|
stored uint64 // Useful data size of all transactions on disk
|
|
limbo *limbo // Persistent data store for the non-finalized blobs
|
|
|
|
signer types.Signer // Transaction signer to use for sender recovery
|
|
chain BlockChain // Chain object to access the state through
|
|
|
|
head *types.Header // Current head of the chain
|
|
state *state.StateDB // Current state at the head of the chain
|
|
gasTip *uint256.Int // Currently accepted minimum gas tip
|
|
|
|
lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries
|
|
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
|
|
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
|
|
evict *evictHeap // Heap of cheapest accounts for eviction when full
|
|
|
|
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
|
|
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
|
|
|
|
lock sync.RWMutex // Mutex protecting the pool during reorg handling
|
|
}
|
|
|
|
// New creates a new blob transaction pool to gather, sort and filter inbound
|
|
// blob transactions from the network.
|
|
func New(config Config, chain BlockChain) *BlobPool {
|
|
// Sanitize the input to ensure no vulnerable gas prices are set
|
|
config = (&config).sanitize()
|
|
|
|
// Create the transaction pool with its initial settings
|
|
return &BlobPool{
|
|
config: config,
|
|
signer: types.LatestSigner(chain.Config()),
|
|
chain: chain,
|
|
lookup: make(map[common.Hash]uint64),
|
|
index: make(map[common.Address][]*blobTxMeta),
|
|
spent: make(map[common.Address]*uint256.Int),
|
|
}
|
|
}
|
|
|
|
// Filter returns whether the given transaction can be consumed by the blob pool.
|
|
func (p *BlobPool) Filter(tx *types.Transaction) bool {
|
|
return tx.Type() == types.BlobTxType
|
|
}
|
|
|
|
// Init sets the gas price needed to keep a transaction in the pool and the chain
|
|
// head to allow balance / nonce checks. The transaction journal will be loaded
|
|
// from disk and filtered based on the provided starting settings.
|
|
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
|
|
p.reserve = reserve
|
|
|
|
var (
|
|
queuedir string
|
|
limbodir string
|
|
)
|
|
if p.config.Datadir != "" {
|
|
queuedir = filepath.Join(p.config.Datadir, pendingTransactionStore)
|
|
if err := os.MkdirAll(queuedir, 0700); err != nil {
|
|
return err
|
|
}
|
|
limbodir = filepath.Join(p.config.Datadir, limboedTransactionStore)
|
|
if err := os.MkdirAll(limbodir, 0700); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// Initialize the state with head block, or fallback to empty one in
|
|
// case the head state is not available (might occur when node is not
|
|
// fully synced).
|
|
state, err := p.chain.StateAt(head.Root)
|
|
if err != nil {
|
|
state, err = p.chain.StateAt(types.EmptyRootHash)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.head, p.state = head, state
|
|
|
|
// Index all transactions on disk and delete anything unprocessable
|
|
var fails []uint64
|
|
index := func(id uint64, size uint32, blob []byte) {
|
|
if p.parseTransaction(id, size, blob) != nil {
|
|
fails = append(fails, id)
|
|
}
|
|
}
|
|
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, newSlotter(), index)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.store = store
|
|
|
|
if len(fails) > 0 {
|
|
log.Warn("Dropping invalidated blob transactions", "ids", fails)
|
|
dropInvalidMeter.Mark(int64(len(fails)))
|
|
|
|
for _, id := range fails {
|
|
if err := p.store.Delete(id); err != nil {
|
|
p.Close()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
// Sort the indexed transactions by nonce and delete anything gapped, create
|
|
// the eviction heap of anyone still standing
|
|
for addr := range p.index {
|
|
p.recheck(addr, nil)
|
|
}
|
|
var (
|
|
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
|
|
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
|
|
)
|
|
if p.head.ExcessBlobGas != nil {
|
|
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(*p.head.ExcessBlobGas))
|
|
}
|
|
p.evict = newPriceHeap(basefee, blobfee, p.index)
|
|
|
|
// Pool initialized, attach the blob limbo to it to track blobs included
|
|
// recently but not yet finalized
|
|
p.limbo, err = newLimbo(limbodir)
|
|
if err != nil {
|
|
p.Close()
|
|
return err
|
|
}
|
|
// Set the configured gas tip, triggering a filtering of anything just loaded
|
|
basefeeGauge.Update(int64(basefee.Uint64()))
|
|
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
|
|
|
p.SetGasTip(new(big.Int).SetUint64(gasTip))
|
|
|
|
// Since the user might have modified their pool's capacity, evict anything
|
|
// above the current allowance
|
|
for p.stored > p.config.Datacap {
|
|
p.drop()
|
|
}
|
|
// Update the metrics and return the constructed pool
|
|
datacapGauge.Update(int64(p.config.Datacap))
|
|
p.updateStorageMetrics()
|
|
return nil
|
|
}
|
|
|
|
// Close closes down the underlying persistent store.
|
|
func (p *BlobPool) Close() error {
|
|
var errs []error
|
|
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
|
|
if err := p.limbo.Close(); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
if err := p.store.Close(); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
switch {
|
|
case errs == nil:
|
|
return nil
|
|
case len(errs) == 1:
|
|
return errs[0]
|
|
default:
|
|
return fmt.Errorf("%v", errs)
|
|
}
|
|
}
|
|
|
|
// parseTransaction is a callback method on pool creation that gets called for
|
|
// each transaction on disk to create the in-memory metadata index.
|
|
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
|
|
tx := new(types.Transaction)
|
|
if err := rlp.DecodeBytes(blob, tx); err != nil {
|
|
// This path is impossible unless the disk data representation changes
|
|
// across restarts. For that ever improbable case, recover gracefully
|
|
// by ignoring this data entry.
|
|
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
|
|
return err
|
|
}
|
|
if tx.BlobTxSidecar() == nil {
|
|
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash())
|
|
return errors.New("missing blob sidecar")
|
|
}
|
|
|
|
meta := newBlobTxMeta(id, size, tx)
|
|
if _, exists := p.lookup[meta.hash]; exists {
|
|
// This path is only possible after a crash, where deleted items are not
|
|
// removed via the normal shutdown-startup procedure and thus may get
|
|
// partially resurrected.
|
|
log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash())
|
|
return errors.New("duplicate blob entry")
|
|
}
|
|
sender, err := p.signer.Sender(tx)
|
|
if err != nil {
|
|
// This path is impossible unless the signature validity changes across
|
|
// restarts. For that ever improbable case, recover gracefully by ignoring
|
|
// this data entry.
|
|
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
|
|
return err
|
|
}
|
|
if _, ok := p.index[sender]; !ok {
|
|
if err := p.reserve(sender, true); err != nil {
|
|
return err
|
|
}
|
|
p.index[sender] = []*blobTxMeta{}
|
|
p.spent[sender] = new(uint256.Int)
|
|
}
|
|
p.index[sender] = append(p.index[sender], meta)
|
|
p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap)
|
|
|
|
p.lookup[meta.hash] = meta.id
|
|
p.stored += uint64(meta.size)
|
|
|
|
return nil
|
|
}
|
|
|
|
// recheck verifies the pool's content for a specific account and drops anything
|
|
// that does not fit anymore (dangling or filled nonce, overdraft).
|
|
func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint64) {
|
|
// Sort the transactions belonging to the account so reinjects can be simpler
|
|
txs := p.index[addr]
|
|
if inclusions != nil && txs == nil { // during reorgs, we might find new accounts
|
|
return
|
|
}
|
|
sort.Slice(txs, func(i, j int) bool {
|
|
return txs[i].nonce < txs[j].nonce
|
|
})
|
|
// If there is a gap between the chain state and the blob pool, drop
|
|
// all the transactions as they are non-executable. Similarly, if the
|
|
// entire tx range was included, drop all.
|
|
var (
|
|
next = p.state.GetNonce(addr)
|
|
gapped = txs[0].nonce > next
|
|
filled = txs[len(txs)-1].nonce < next
|
|
)
|
|
if gapped || filled {
|
|
var (
|
|
ids []uint64
|
|
nonces []uint64
|
|
)
|
|
for i := 0; i < len(txs); i++ {
|
|
ids = append(ids, txs[i].id)
|
|
nonces = append(nonces, txs[i].nonce)
|
|
|
|
p.stored -= uint64(txs[i].size)
|
|
delete(p.lookup, txs[i].hash)
|
|
|
|
// Included transactions blobs need to be moved to the limbo
|
|
if filled && inclusions != nil {
|
|
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
|
|
}
|
|
}
|
|
delete(p.index, addr)
|
|
delete(p.spent, addr)
|
|
if inclusions != nil { // only during reorgs will the heap be initialized
|
|
heap.Remove(p.evict, p.evict.index[addr])
|
|
}
|
|
p.reserve(addr, false)
|
|
|
|
if gapped {
|
|
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
|
|
dropDanglingMeter.Mark(int64(len(ids)))
|
|
} else {
|
|
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
|
|
dropFilledMeter.Mark(int64(len(ids)))
|
|
}
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
// If there is overlap between the chain state and the blob pool, drop
|
|
// anything below the current state
|
|
if txs[0].nonce < next {
|
|
var (
|
|
ids []uint64
|
|
nonces []uint64
|
|
)
|
|
for len(txs) > 0 && txs[0].nonce < next {
|
|
ids = append(ids, txs[0].id)
|
|
nonces = append(nonces, txs[0].nonce)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap)
|
|
p.stored -= uint64(txs[0].size)
|
|
delete(p.lookup, txs[0].hash)
|
|
|
|
// Included transactions blobs need to be moved to the limbo
|
|
if inclusions != nil {
|
|
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
|
|
}
|
|
txs = txs[1:]
|
|
}
|
|
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
|
|
dropOverlappedMeter.Mark(int64(len(ids)))
|
|
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
}
|
|
p.index[addr] = txs
|
|
}
|
|
// Iterate over the transactions to initialize their eviction thresholds
|
|
// and to detect any nonce gaps
|
|
txs[0].evictionExecTip = txs[0].execTipCap
|
|
txs[0].evictionExecFeeJumps = txs[0].basefeeJumps
|
|
txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps
|
|
|
|
for i := 1; i < len(txs); i++ {
|
|
// If there's no nonce gap, initialize the eviction thresholds as the
|
|
// minimum between the cumulative thresholds and the current tx fees
|
|
if txs[i].nonce == txs[i-1].nonce+1 {
|
|
txs[i].evictionExecTip = txs[i-1].evictionExecTip
|
|
if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 {
|
|
txs[i].evictionExecTip = txs[i].execTipCap
|
|
}
|
|
txs[i].evictionExecFeeJumps = txs[i-1].evictionExecFeeJumps
|
|
if txs[i].evictionExecFeeJumps > txs[i].basefeeJumps {
|
|
txs[i].evictionExecFeeJumps = txs[i].basefeeJumps
|
|
}
|
|
txs[i].evictionBlobFeeJumps = txs[i-1].evictionBlobFeeJumps
|
|
if txs[i].evictionBlobFeeJumps > txs[i].blobfeeJumps {
|
|
txs[i].evictionBlobFeeJumps = txs[i].blobfeeJumps
|
|
}
|
|
continue
|
|
}
|
|
// Sanity check that there's no double nonce. This case would generally
|
|
// be a coding error, so better know about it.
|
|
//
|
|
// Also, Billy behind the blobpool does not journal deletes. A process
|
|
// crash would result in previously deleted entities being resurrected.
|
|
// That could potentially cause a duplicate nonce to appear.
|
|
if txs[i].nonce == txs[i-1].nonce {
|
|
id := p.lookup[txs[i].hash]
|
|
|
|
log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
|
|
dropRepeatedMeter.Mark(1)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
|
|
p.stored -= uint64(txs[i].size)
|
|
delete(p.lookup, txs[i].hash)
|
|
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
txs = append(txs[:i], txs[i+1:]...)
|
|
p.index[addr] = txs
|
|
|
|
i--
|
|
continue
|
|
}
|
|
// Otherwise if there's a nonce gap evict all later transactions
|
|
var (
|
|
ids []uint64
|
|
nonces []uint64
|
|
)
|
|
for j := i; j < len(txs); j++ {
|
|
ids = append(ids, txs[j].id)
|
|
nonces = append(nonces, txs[j].nonce)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap)
|
|
p.stored -= uint64(txs[j].size)
|
|
delete(p.lookup, txs[j].hash)
|
|
}
|
|
txs = txs[:i]
|
|
|
|
log.Error("Dropping gapped blob transactions", "from", addr, "missing", txs[i-1].nonce+1, "drop", nonces, "ids", ids)
|
|
dropGappedMeter.Mark(int64(len(ids)))
|
|
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
}
|
|
p.index[addr] = txs
|
|
break
|
|
}
|
|
// Ensure that there's no over-draft, this is expected to happen when some
|
|
// transactions get included without publishing on the network
|
|
var (
|
|
balance = p.state.GetBalance(addr)
|
|
spent = p.spent[addr]
|
|
)
|
|
if spent.Cmp(balance) > 0 {
|
|
// Evict the highest nonce transactions until the pending set falls under
|
|
// the account's available balance
|
|
var (
|
|
ids []uint64
|
|
nonces []uint64
|
|
)
|
|
for p.spent[addr].Cmp(balance) > 0 {
|
|
last := txs[len(txs)-1]
|
|
txs[len(txs)-1] = nil
|
|
txs = txs[:len(txs)-1]
|
|
|
|
ids = append(ids, last.id)
|
|
nonces = append(nonces, last.nonce)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
|
|
p.stored -= uint64(last.size)
|
|
delete(p.lookup, last.hash)
|
|
}
|
|
if len(txs) == 0 {
|
|
delete(p.index, addr)
|
|
delete(p.spent, addr)
|
|
if inclusions != nil { // only during reorgs will the heap be initialized
|
|
heap.Remove(p.evict, p.evict.index[addr])
|
|
}
|
|
p.reserve(addr, false)
|
|
} else {
|
|
p.index[addr] = txs
|
|
}
|
|
log.Warn("Dropping overdrafted blob transactions", "from", addr, "balance", balance, "spent", spent, "drop", nonces, "ids", ids)
|
|
dropOverdraftedMeter.Mark(int64(len(ids)))
|
|
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
}
|
|
}
|
|
// Sanity check that no account can have more queued transactions than the
|
|
// DoS protection threshold.
|
|
if len(txs) > maxTxsPerAccount {
|
|
// Evict the highest nonce transactions until the pending set falls under
|
|
// the account's transaction cap
|
|
var (
|
|
ids []uint64
|
|
nonces []uint64
|
|
)
|
|
for len(txs) > maxTxsPerAccount {
|
|
last := txs[len(txs)-1]
|
|
txs[len(txs)-1] = nil
|
|
txs = txs[:len(txs)-1]
|
|
|
|
ids = append(ids, last.id)
|
|
nonces = append(nonces, last.nonce)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
|
|
p.stored -= uint64(last.size)
|
|
delete(p.lookup, last.hash)
|
|
}
|
|
p.index[addr] = txs
|
|
|
|
log.Warn("Dropping overcapped blob transactions", "from", addr, "kept", len(txs), "drop", nonces, "ids", ids)
|
|
dropOvercappedMeter.Mark(int64(len(ids)))
|
|
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
|
}
|
|
}
|
|
}
|
|
// Included cheap transactions might have left the remaining ones better from
|
|
// an eviction point, fix any potential issues in the heap.
|
|
if _, ok := p.index[addr]; ok && inclusions != nil {
|
|
heap.Fix(p.evict, p.evict.index[addr])
|
|
}
|
|
}
|
|
|
|
// offload removes a tracked blob transaction from the pool and moves it into the
|
|
// limbo for tracking until finality.
|
|
//
|
|
// The method may log errors for various unexpected scenarios but will not return
|
|
// any of it since there's no clear error case. Some errors may be due to coding
|
|
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
|
// all cases, the pool needs to continue operating.
|
|
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
|
|
data, err := p.store.Get(id)
|
|
if err != nil {
|
|
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
return
|
|
}
|
|
var tx types.Transaction
|
|
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
|
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
return
|
|
}
|
|
block, ok := inclusions[tx.Hash()]
|
|
if !ok {
|
|
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
|
|
return
|
|
}
|
|
if err := p.limbo.push(&tx, block); err != nil {
|
|
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
|
|
// kept in sync with the main transaction pool's internal state.
|
|
func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
|
|
waitStart := time.Now()
|
|
p.lock.Lock()
|
|
resetwaitHist.Update(time.Since(waitStart).Nanoseconds())
|
|
defer p.lock.Unlock()
|
|
|
|
defer func(start time.Time) {
|
|
resettimeHist.Update(time.Since(start).Nanoseconds())
|
|
}(time.Now())
|
|
|
|
statedb, err := p.chain.StateAt(newHead.Root)
|
|
if err != nil {
|
|
log.Error("Failed to reset blobpool state", "err", err)
|
|
return
|
|
}
|
|
p.head = newHead
|
|
p.state = statedb
|
|
|
|
// Run the reorg between the old and new head and figure out which accounts
|
|
// need to be rechecked and which transactions need to be readded
|
|
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
|
|
var adds []*types.Transaction
|
|
for addr, txs := range reinject {
|
|
// Blindly push all the lost transactions back into the pool
|
|
for _, tx := range txs {
|
|
if err := p.reinject(addr, tx.Hash()); err == nil {
|
|
adds = append(adds, tx.WithoutBlobTxSidecar())
|
|
}
|
|
}
|
|
// Recheck the account's pooled transactions to drop included and
|
|
// invalidated ones
|
|
p.recheck(addr, inclusions)
|
|
}
|
|
if len(adds) > 0 {
|
|
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
|
|
}
|
|
}
|
|
// Flush out any blobs from limbo that are older than the latest finality
|
|
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
|
|
p.limbo.finalize(p.chain.CurrentFinalBlock())
|
|
}
|
|
// Reset the price heap for the new set of basefee/blobfee pairs
|
|
var (
|
|
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), newHead))
|
|
blobfee = uint256.MustFromBig(big.NewInt(params.BlobTxMinBlobGasprice))
|
|
)
|
|
if newHead.ExcessBlobGas != nil {
|
|
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(*newHead.ExcessBlobGas))
|
|
}
|
|
p.evict.reinit(basefee, blobfee, false)
|
|
|
|
basefeeGauge.Update(int64(basefee.Uint64()))
|
|
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
|
p.updateStorageMetrics()
|
|
}
|
|
|
|
// reorg assembles all the transactors and missing transactions between an old
|
|
// and new head to figure out which account's tx set needs to be rechecked and
|
|
// which transactions need to be requeued.
|
|
//
|
|
// The transactionblock inclusion infos are also returned to allow tracking any
|
|
// just-included blocks by block number in the limbo.
|
|
func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*types.Transaction, map[common.Hash]uint64) {
|
|
// If the pool was not yet initialized, don't do anything
|
|
if oldHead == nil {
|
|
return nil, nil
|
|
}
|
|
// If the reorg is too deep, avoid doing it (will happen during snap sync)
|
|
oldNum := oldHead.Number.Uint64()
|
|
newNum := newHead.Number.Uint64()
|
|
|
|
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
|
|
return nil, nil
|
|
}
|
|
// Reorg seems shallow enough to pull in all transactions into memory
|
|
var (
|
|
transactors = make(map[common.Address]struct{})
|
|
discarded = make(map[common.Address][]*types.Transaction)
|
|
included = make(map[common.Address][]*types.Transaction)
|
|
inclusions = make(map[common.Hash]uint64)
|
|
|
|
rem = p.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
|
|
add = p.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
|
|
)
|
|
if add == nil {
|
|
// if the new head is nil, it means that something happened between
|
|
// the firing of newhead-event and _now_: most likely a
|
|
// reorg caused by sync-reversion or explicit sethead back to an
|
|
// earlier block.
|
|
log.Warn("Blobpool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash())
|
|
return nil, nil
|
|
}
|
|
if rem == nil {
|
|
// This can happen if a setHead is performed, where we simply discard
|
|
// the old head from the chain. If that is the case, we don't have the
|
|
// lost transactions anymore, and there's nothing to add.
|
|
if newNum >= oldNum {
|
|
// If we reorged to a same or higher number, then it's not a case
|
|
// of setHead
|
|
log.Warn("Blobpool reset with missing old head",
|
|
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
|
|
return nil, nil
|
|
}
|
|
// If the reorg ended up on a lower number, it's indicative of setHead
|
|
// being the cause
|
|
log.Debug("Skipping blobpool reset caused by setHead",
|
|
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
|
|
return nil, nil
|
|
}
|
|
// Both old and new blocks exist, traverse through the progression chain
|
|
// and accumulate the transactors and transactions
|
|
for rem.NumberU64() > add.NumberU64() {
|
|
for _, tx := range rem.Transactions() {
|
|
from, _ := p.signer.Sender(tx)
|
|
|
|
discarded[from] = append(discarded[from], tx)
|
|
transactors[from] = struct{}{}
|
|
}
|
|
if rem = p.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
|
|
log.Error("Unrooted old chain seen by blobpool", "block", oldHead.Number, "hash", oldHead.Hash())
|
|
return nil, nil
|
|
}
|
|
}
|
|
for add.NumberU64() > rem.NumberU64() {
|
|
for _, tx := range add.Transactions() {
|
|
from, _ := p.signer.Sender(tx)
|
|
|
|
included[from] = append(included[from], tx)
|
|
inclusions[tx.Hash()] = add.NumberU64()
|
|
transactors[from] = struct{}{}
|
|
}
|
|
if add = p.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
|
|
log.Error("Unrooted new chain seen by blobpool", "block", newHead.Number, "hash", newHead.Hash())
|
|
return nil, nil
|
|
}
|
|
}
|
|
for rem.Hash() != add.Hash() {
|
|
for _, tx := range rem.Transactions() {
|
|
from, _ := p.signer.Sender(tx)
|
|
|
|
discarded[from] = append(discarded[from], tx)
|
|
transactors[from] = struct{}{}
|
|
}
|
|
if rem = p.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
|
|
log.Error("Unrooted old chain seen by blobpool", "block", oldHead.Number, "hash", oldHead.Hash())
|
|
return nil, nil
|
|
}
|
|
for _, tx := range add.Transactions() {
|
|
from, _ := p.signer.Sender(tx)
|
|
|
|
included[from] = append(included[from], tx)
|
|
inclusions[tx.Hash()] = add.NumberU64()
|
|
transactors[from] = struct{}{}
|
|
}
|
|
if add = p.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
|
|
log.Error("Unrooted new chain seen by blobpool", "block", newHead.Number, "hash", newHead.Hash())
|
|
return nil, nil
|
|
}
|
|
}
|
|
// Generate the set of transactions per address to pull back into the pool,
|
|
// also updating the rest along the way
|
|
reinject := make(map[common.Address][]*types.Transaction, len(transactors))
|
|
for addr := range transactors {
|
|
// Generate the set that was lost to reinject into the pool
|
|
lost := make([]*types.Transaction, 0, len(discarded[addr]))
|
|
for _, tx := range types.TxDifference(discarded[addr], included[addr]) {
|
|
if p.Filter(tx) {
|
|
lost = append(lost, tx)
|
|
}
|
|
}
|
|
if len(lost) > 0 {
|
|
reinject[addr] = lost
|
|
}
|
|
|
|
// Update the set that was already reincluded to track the blocks in limbo
|
|
for _, tx := range types.TxDifference(included[addr], discarded[addr]) {
|
|
if p.Filter(tx) {
|
|
p.limbo.update(tx.Hash(), inclusions[tx.Hash()])
|
|
}
|
|
}
|
|
}
|
|
return reinject, inclusions
|
|
}
|
|
|
|
// reinject blindly pushes a transaction previously included in the chain - and
|
|
// just reorged out - into the pool. The transaction is assumed valid (having
|
|
// been in the chain), thus the only validation needed is nonce sorting and over-
|
|
// draft checks after injection.
|
|
//
|
|
// Note, the method will not initialize the eviction cache values as those will
|
|
// be done once for all transactions belonging to an account after all individual
|
|
// transactions are injected back into the pool.
|
|
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
|
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
|
// add the transaction back into the pool as it is not mineable.
|
|
tx, err := p.limbo.pull(txhash)
|
|
if err != nil {
|
|
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
|
return err
|
|
}
|
|
// TODO: seems like an easy optimization here would be getting the serialized tx
|
|
// from limbo instead of re-serializing it here.
|
|
|
|
// Serialize the transaction back into the primary datastore.
|
|
blob, err := rlp.EncodeToBytes(tx)
|
|
if err != nil {
|
|
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
|
return err
|
|
}
|
|
id, err := p.store.Put(blob)
|
|
if err != nil {
|
|
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
|
|
return err
|
|
}
|
|
|
|
// Update the indices and metrics
|
|
meta := newBlobTxMeta(id, p.store.Size(id), tx)
|
|
if _, ok := p.index[addr]; !ok {
|
|
if err := p.reserve(addr, true); err != nil {
|
|
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
|
return err
|
|
}
|
|
p.index[addr] = []*blobTxMeta{meta}
|
|
p.spent[addr] = meta.costCap
|
|
p.evict.Push(addr)
|
|
} else {
|
|
p.index[addr] = append(p.index[addr], meta)
|
|
p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap)
|
|
}
|
|
p.lookup[meta.hash] = meta.id
|
|
p.stored += uint64(meta.size)
|
|
return nil
|
|
}
|
|
|
|
func (p *BlobPool) flushTransactionsBelowTip(tip *uint256.Int) {
|
|
for addr, txs := range p.index {
|
|
for i, tx := range txs {
|
|
if tx.execTipCap.Cmp(tip) < 0 {
|
|
// Drop the offending transaction
|
|
var (
|
|
ids = []uint64{tx.id}
|
|
nonces = []uint64{tx.nonce}
|
|
)
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
|
|
p.stored -= uint64(tx.size)
|
|
delete(p.lookup, tx.hash)
|
|
txs[i] = nil
|
|
|
|
// Drop everything afterwards, no gaps allowed
|
|
for j, tx := range txs[i+1:] {
|
|
ids = append(ids, tx.id)
|
|
nonces = append(nonces, tx.nonce)
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap)
|
|
p.stored -= uint64(tx.size)
|
|
delete(p.lookup, tx.hash)
|
|
txs[i+1+j] = nil
|
|
}
|
|
// Clear out the dropped transactions from the index
|
|
if i > 0 {
|
|
p.index[addr] = txs[:i]
|
|
heap.Fix(p.evict, p.evict.index[addr])
|
|
} else {
|
|
delete(p.index, addr)
|
|
delete(p.spent, addr)
|
|
|
|
heap.Remove(p.evict, p.evict.index[addr])
|
|
p.reserve(addr, false)
|
|
}
|
|
// Clear out the transactions from the data store
|
|
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
|
|
dropUnderpricedMeter.Mark(int64(len(ids)))
|
|
|
|
for _, id := range ids {
|
|
if err := p.store.Delete(id); err != nil {
|
|
log.Error("Failed to delete dropped transaction", "id", id, "err", err)
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
|
|
// to be kept in sync with the main transaction pool's gas requirements.
|
|
func (p *BlobPool) SetGasTip(tip *big.Int) {
|
|
p.lock.Lock()
|
|
defer p.lock.Unlock()
|
|
|
|
// Store the new minimum gas tip
|
|
old := p.gasTip
|
|
p.gasTip = uint256.MustFromBig(tip)
|
|
|
|
// If the min miner fee increased, remove transactions below the new threshold
|
|
if old == nil || p.gasTip.Cmp(old) > 0 {
|
|
p.flushTransactionsBelowTip(p.gasTip)
|
|
}
|
|
log.Debug("Blobpool tip threshold updated", "tip", tip)
|
|
pooltipGauge.Update(tip.Int64())
|
|
p.updateStorageMetrics()
|
|
}
|
|
|
|
func (p *BlobPool) FlushAllTransactions() {
|
|
maxUint256 := uint256.MustFromBig(new(big.Int).Sub(new(big.Int).Lsh(common.Big1, 256), common.Big1))
|
|
p.lock.Lock()
|
|
defer p.lock.Unlock()
|
|
p.flushTransactionsBelowTip(maxUint256)
|
|
}
|
|
|
|
// validateTx checks whether a transaction is valid according to the consensus
|
|
// rules and adheres to some heuristic limits of the local node (price and size).
|
|
func (p *BlobPool) validateTx(tx *types.Transaction) error {
|
|
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
|
|
// consensus rules
|
|
baseOpts := &txpool.ValidationOptions{
|
|
Config: p.chain.Config(),
|
|
Accept: 1 << types.BlobTxType,
|
|
MaxSize: txMaxSize,
|
|
MinTip: p.gasTip.ToBig(),
|
|
}
|
|
if err := txpool.ValidateTransaction(tx, p.head, p.signer, baseOpts); err != nil {
|
|
return err
|
|
}
|
|
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
|
|
stateOpts := &txpool.ValidationOptionsWithState{
|
|
State: p.state,
|
|
|
|
FirstNonceGap: func(addr common.Address) uint64 {
|
|
// Nonce gaps are not permitted in the blob pool, the first gap will
|
|
// be the next nonce shifted by however many transactions we already
|
|
// have pooled.
|
|
return p.state.GetNonce(addr) + uint64(len(p.index[addr]))
|
|
},
|
|
UsedAndLeftSlots: func(addr common.Address) (int, int) {
|
|
have := len(p.index[addr])
|
|
if have >= maxTxsPerAccount {
|
|
return have, 0
|
|
}
|
|
return have, maxTxsPerAccount - have
|
|
},
|
|
ExistingExpenditure: func(addr common.Address) *big.Int {
|
|
if spent := p.spent[addr]; spent != nil {
|
|
return spent.ToBig()
|
|
}
|
|
return new(big.Int)
|
|
},
|
|
ExistingCost: func(addr common.Address, nonce uint64) *big.Int {
|
|
next := p.state.GetNonce(addr)
|
|
if uint64(len(p.index[addr])) > nonce-next {
|
|
return p.index[addr][int(nonce-next)].costCap.ToBig()
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil {
|
|
return err
|
|
}
|
|
// If the transaction replaces an existing one, ensure that price bumps are
|
|
// adhered to.
|
|
var (
|
|
from, _ = p.signer.Sender(tx) // already validated above
|
|
next = p.state.GetNonce(from)
|
|
)
|
|
if uint64(len(p.index[from])) > tx.Nonce()-next {
|
|
prev := p.index[from][int(tx.Nonce()-next)]
|
|
// Ensure the transaction is different than the one tracked locally
|
|
if prev.hash == tx.Hash() {
|
|
return txpool.ErrAlreadyKnown
|
|
}
|
|
// Account can support the replacement, but the price bump must also be met
|
|
switch {
|
|
case tx.GasFeeCapIntCmp(prev.execFeeCap.ToBig()) <= 0:
|
|
return fmt.Errorf("%w: new tx gas fee cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap)
|
|
case tx.GasTipCapIntCmp(prev.execTipCap.ToBig()) <= 0:
|
|
return fmt.Errorf("%w: new tx gas tip cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.GasTipCap(), prev.execTipCap)
|
|
case tx.BlobGasFeeCapIntCmp(prev.blobFeeCap.ToBig()) <= 0:
|
|
return fmt.Errorf("%w: new tx blob gas fee cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.BlobGasFeeCap(), prev.blobFeeCap)
|
|
}
|
|
var (
|
|
multiplier = uint256.NewInt(100 + p.config.PriceBump)
|
|
onehundred = uint256.NewInt(100)
|
|
|
|
minGasFeeCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.execFeeCap), onehundred)
|
|
minGasTipCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.execTipCap), onehundred)
|
|
minBlobGasFeeCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.blobFeeCap), onehundred)
|
|
)
|
|
switch {
|
|
case tx.GasFeeCapIntCmp(minGasFeeCap.ToBig()) < 0:
|
|
return fmt.Errorf("%w: new tx gas fee cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap, p.config.PriceBump)
|
|
case tx.GasTipCapIntCmp(minGasTipCap.ToBig()) < 0:
|
|
return fmt.Errorf("%w: new tx gas tip cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.GasTipCap(), prev.execTipCap, p.config.PriceBump)
|
|
case tx.BlobGasFeeCapIntCmp(minBlobGasFeeCap.ToBig()) < 0:
|
|
return fmt.Errorf("%w: new tx blob gas fee cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.BlobGasFeeCap(), prev.blobFeeCap, p.config.PriceBump)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Has returns an indicator whether subpool has a transaction cached with the
|
|
// given hash.
|
|
func (p *BlobPool) Has(hash common.Hash) bool {
|
|
p.lock.RLock()
|
|
defer p.lock.RUnlock()
|
|
|
|
_, ok := p.lookup[hash]
|
|
return ok
|
|
}
|
|
|
|
// Get returns a transaction if it is contained in the pool, or nil otherwise.
|
|
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
|
|
// Track the amount of time waiting to retrieve a fully resolved blob tx from
|
|
// the pool and the amount of time actually spent on pulling the data from disk.
|
|
getStart := time.Now()
|
|
p.lock.RLock()
|
|
getwaitHist.Update(time.Since(getStart).Nanoseconds())
|
|
defer p.lock.RUnlock()
|
|
|
|
defer func(start time.Time) {
|
|
gettimeHist.Update(time.Since(start).Nanoseconds())
|
|
}(time.Now())
|
|
|
|
// Pull the blob from disk and return an assembled response
|
|
id, ok := p.lookup[hash]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
data, err := p.store.Get(id)
|
|
if err != nil {
|
|
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
|
|
return nil
|
|
}
|
|
item := new(types.Transaction)
|
|
if err = rlp.DecodeBytes(data, item); err != nil {
|
|
log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err)
|
|
return nil
|
|
}
|
|
return item
|
|
}
|
|
|
|
// Add inserts a set of blob transactions into the pool if they pass validation (both
|
|
// consensus validity and pool restrictions).
|
|
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
|
|
var (
|
|
adds = make([]*types.Transaction, 0, len(txs))
|
|
errs = make([]error, len(txs))
|
|
)
|
|
for i, tx := range txs {
|
|
errs[i] = p.add(tx)
|
|
if errs[i] == nil {
|
|
adds = append(adds, tx.WithoutBlobTxSidecar())
|
|
}
|
|
}
|
|
if len(adds) > 0 {
|
|
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
|
|
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// add inserts a new blob transaction into the pool if it passes validation (both
|
|
// consensus validity and pool restrictions).
|
|
func (p *BlobPool) add(tx *types.Transaction) (err error) {
|
|
// The blob pool blocks on adding a transaction. This is because blob txs are
|
|
// only even pulled from the network, so this method will act as the overload
|
|
// protection for fetches.
|
|
waitStart := time.Now()
|
|
p.lock.Lock()
|
|
addwaitHist.Update(time.Since(waitStart).Nanoseconds())
|
|
defer p.lock.Unlock()
|
|
|
|
defer func(start time.Time) {
|
|
addtimeHist.Update(time.Since(start).Nanoseconds())
|
|
}(time.Now())
|
|
|
|
// Ensure the transaction is valid from all perspectives
|
|
if err := p.validateTx(tx); err != nil {
|
|
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
|
|
switch {
|
|
case errors.Is(err, txpool.ErrUnderpriced):
|
|
addUnderpricedMeter.Mark(1)
|
|
case errors.Is(err, core.ErrNonceTooLow):
|
|
addStaleMeter.Mark(1)
|
|
case errors.Is(err, core.ErrNonceTooHigh):
|
|
addGappedMeter.Mark(1)
|
|
case errors.Is(err, core.ErrInsufficientFunds):
|
|
addOverdraftedMeter.Mark(1)
|
|
case errors.Is(err, txpool.ErrAccountLimitExceeded):
|
|
addOvercappedMeter.Mark(1)
|
|
case errors.Is(err, txpool.ErrReplaceUnderpriced):
|
|
addNoreplaceMeter.Mark(1)
|
|
default:
|
|
addInvalidMeter.Mark(1)
|
|
}
|
|
return err
|
|
}
|
|
// If the address is not yet known, request exclusivity to track the account
|
|
// only by this subpool until all transactions are evicted
|
|
from, _ := types.Sender(p.signer, tx) // already validated above
|
|
if _, ok := p.index[from]; !ok {
|
|
if err := p.reserve(from, true); err != nil {
|
|
addNonExclusiveMeter.Mark(1)
|
|
return err
|
|
}
|
|
defer func() {
|
|
// If the transaction is rejected by some post-validation check, remove
|
|
// the lock on the reservation set.
|
|
//
|
|
// Note, `err` here is the named error return, which will be initialized
|
|
// by a return statement before running deferred methods. Take care with
|
|
// removing or subscoping err as it will break this clause.
|
|
if err != nil {
|
|
p.reserve(from, false)
|
|
}
|
|
}()
|
|
}
|
|
// Transaction permitted into the pool from a nonce and cost perspective,
|
|
// insert it into the database and update the indices
|
|
blob, err := rlp.EncodeToBytes(tx)
|
|
if err != nil {
|
|
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
|
return err
|
|
}
|
|
id, err := p.store.Put(blob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
meta := newBlobTxMeta(id, p.store.Size(id), tx)
|
|
|
|
var (
|
|
next = p.state.GetNonce(from)
|
|
offset = int(tx.Nonce() - next)
|
|
newacc = false
|
|
)
|
|
var oldEvictionExecFeeJumps, oldEvictionBlobFeeJumps float64
|
|
if txs, ok := p.index[from]; ok {
|
|
oldEvictionExecFeeJumps = txs[len(txs)-1].evictionExecFeeJumps
|
|
oldEvictionBlobFeeJumps = txs[len(txs)-1].evictionBlobFeeJumps
|
|
}
|
|
if len(p.index[from]) > offset {
|
|
// Transaction replaces a previously queued one
|
|
dropReplacedMeter.Mark(1)
|
|
|
|
prev := p.index[from][offset]
|
|
if err := p.store.Delete(prev.id); err != nil {
|
|
// Shitty situation, but try to recover gracefully instead of going boom
|
|
log.Error("Failed to delete replaced transaction", "id", prev.id, "err", err)
|
|
}
|
|
// Update the transaction index
|
|
p.index[from][offset] = meta
|
|
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap)
|
|
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
|
|
|
|
delete(p.lookup, prev.hash)
|
|
p.lookup[meta.hash] = meta.id
|
|
p.stored += uint64(meta.size) - uint64(prev.size)
|
|
} else {
|
|
// Transaction extends previously scheduled ones
|
|
p.index[from] = append(p.index[from], meta)
|
|
if _, ok := p.spent[from]; !ok {
|
|
p.spent[from] = new(uint256.Int)
|
|
newacc = true
|
|
}
|
|
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
|
|
p.lookup[meta.hash] = meta.id
|
|
p.stored += uint64(meta.size)
|
|
}
|
|
// Recompute the rolling eviction fields. In case of a replacement, this will
|
|
// recompute all subsequent fields. In case of an append, this will only do
|
|
// the fresh calculation.
|
|
txs := p.index[from]
|
|
|
|
for i := offset; i < len(txs); i++ {
|
|
// The first transaction will always use itself
|
|
if i == 0 {
|
|
txs[0].evictionExecTip = txs[0].execTipCap
|
|
txs[0].evictionExecFeeJumps = txs[0].basefeeJumps
|
|
txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps
|
|
|
|
continue
|
|
}
|
|
// Subsequent transactions will use a rolling calculation
|
|
txs[i].evictionExecTip = txs[i-1].evictionExecTip
|
|
if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 {
|
|
txs[i].evictionExecTip = txs[i].execTipCap
|
|
}
|
|
txs[i].evictionExecFeeJumps = txs[i-1].evictionExecFeeJumps
|
|
if txs[i].evictionExecFeeJumps > txs[i].basefeeJumps {
|
|
txs[i].evictionExecFeeJumps = txs[i].basefeeJumps
|
|
}
|
|
txs[i].evictionBlobFeeJumps = txs[i-1].evictionBlobFeeJumps
|
|
if txs[i].evictionBlobFeeJumps > txs[i].blobfeeJumps {
|
|
txs[i].evictionBlobFeeJumps = txs[i].blobfeeJumps
|
|
}
|
|
}
|
|
// Update the eviction heap with the new information:
|
|
// - If the transaction is from a new account, add it to the heap
|
|
// - If the account had a singleton tx replaced, update the heap (new price caps)
|
|
// - If the account has a transaction replaced or appended, update the heap if significantly changed
|
|
switch {
|
|
case newacc:
|
|
heap.Push(p.evict, from)
|
|
|
|
case len(txs) == 1: // 1 tx and not a new acc, must be replacement
|
|
heap.Fix(p.evict, p.evict.index[from])
|
|
|
|
default: // replacement or new append
|
|
evictionExecFeeDiff := oldEvictionExecFeeJumps - txs[len(txs)-1].evictionExecFeeJumps
|
|
evictionBlobFeeDiff := oldEvictionBlobFeeJumps - txs[len(txs)-1].evictionBlobFeeJumps
|
|
|
|
if math.Abs(evictionExecFeeDiff) > 0.001 || math.Abs(evictionBlobFeeDiff) > 0.001 { // need math.Abs, can go up and down
|
|
heap.Fix(p.evict, p.evict.index[from])
|
|
}
|
|
}
|
|
// If the pool went over the allowed data limit, evict transactions until
|
|
// we're again below the threshold
|
|
for p.stored > p.config.Datacap {
|
|
p.drop()
|
|
}
|
|
p.updateStorageMetrics()
|
|
|
|
addValidMeter.Mark(1)
|
|
return nil
|
|
}
|
|
|
|
// drop removes the worst transaction from the pool. It is primarily used when a
|
|
// freshly added transaction overflows the pool and needs to evict something. The
|
|
// method is also called on startup if the user resizes their storage, might be an
|
|
// expensive run but it should be fine-ish.
|
|
func (p *BlobPool) drop() {
|
|
// Peek at the account with the worse transaction set to evict from (Go's heap
|
|
// stores the minimum at index zero of the heap slice) and retrieve it's last
|
|
// transaction.
|
|
var (
|
|
from = p.evict.addrs[0] // cannot call drop on empty pool
|
|
|
|
txs = p.index[from]
|
|
drop = txs[len(txs)-1]
|
|
last = len(txs) == 1
|
|
)
|
|
// Remove the transaction from the pool's index
|
|
if last {
|
|
delete(p.index, from)
|
|
delete(p.spent, from)
|
|
p.reserve(from, false)
|
|
} else {
|
|
txs[len(txs)-1] = nil
|
|
txs = txs[:len(txs)-1]
|
|
|
|
p.index[from] = txs
|
|
p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap)
|
|
}
|
|
p.stored -= uint64(drop.size)
|
|
delete(p.lookup, drop.hash)
|
|
|
|
// Remove the transaction from the pool's eviction heap:
|
|
// - If the entire account was dropped, pop off the address
|
|
// - Otherwise, if the new tail has better eviction caps, fix the heap
|
|
if last {
|
|
heap.Pop(p.evict)
|
|
} else {
|
|
tail := txs[len(txs)-1] // new tail, surely exists
|
|
|
|
evictionExecFeeDiff := tail.evictionExecFeeJumps - drop.evictionExecFeeJumps
|
|
evictionBlobFeeDiff := tail.evictionBlobFeeJumps - drop.evictionBlobFeeJumps
|
|
|
|
if evictionExecFeeDiff > 0.001 || evictionBlobFeeDiff > 0.001 { // no need for math.Abs, monotonic decreasing
|
|
heap.Fix(p.evict, 0)
|
|
}
|
|
}
|
|
// Remove the transaction from the data store
|
|
log.Debug("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id)
|
|
dropOverflownMeter.Mark(1)
|
|
|
|
if err := p.store.Delete(drop.id); err != nil {
|
|
log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err)
|
|
}
|
|
}
|
|
|
|
// Pending retrieves all currently processable transactions, grouped by origin
|
|
// account and sorted by nonce.
|
|
//
|
|
// The transactions can also be pre-filtered by the dynamic fee components to
|
|
// reduce allocations and load on downstream subsystems.
|
|
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
|
|
// If only plain transactions are requested, this pool is unsuitable as it
|
|
// contains none, don't even bother.
|
|
if filter.OnlyPlainTxs {
|
|
return nil
|
|
}
|
|
// Track the amount of time waiting to retrieve the list of pending blob txs
|
|
// from the pool and the amount of time actually spent on assembling the data.
|
|
// The latter will be pretty much moot, but we've kept it to have symmetric
|
|
// across all user operations.
|
|
pendStart := time.Now()
|
|
p.lock.RLock()
|
|
pendwaitHist.Update(time.Since(pendStart).Nanoseconds())
|
|
defer p.lock.RUnlock()
|
|
|
|
execStart := time.Now()
|
|
defer func() {
|
|
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
|
|
}()
|
|
|
|
pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
|
|
for addr, txs := range p.index {
|
|
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
|
|
for _, tx := range txs {
|
|
// If transaction filtering was requested, discard badly priced ones
|
|
if filter.MinTip != nil && filter.BaseFee != nil {
|
|
if tx.execFeeCap.Lt(filter.BaseFee) {
|
|
break // basefee too low, cannot be included, discard rest of txs from the account
|
|
}
|
|
tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee)
|
|
if tip.Gt(tx.execTipCap) {
|
|
tip = tx.execTipCap
|
|
}
|
|
if tip.Lt(filter.MinTip) {
|
|
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
|
|
}
|
|
}
|
|
if filter.BlobFee != nil {
|
|
if tx.blobFeeCap.Lt(filter.BlobFee) {
|
|
break // blobfee too low, cannot be included, discard rest of txs from the account
|
|
}
|
|
}
|
|
// Transaction was accepted according to the filter, append to the pending list
|
|
lazies = append(lazies, &txpool.LazyTransaction{
|
|
Pool: p,
|
|
Hash: tx.hash,
|
|
Time: execStart, // TODO(karalabe): Maybe save these and use that?
|
|
GasFeeCap: tx.execFeeCap,
|
|
GasTipCap: tx.execTipCap,
|
|
Gas: tx.execGas,
|
|
BlobGas: tx.blobGas,
|
|
})
|
|
}
|
|
if len(lazies) > 0 {
|
|
pending[addr] = lazies
|
|
}
|
|
}
|
|
return pending
|
|
}
|
|
|
|
// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
|
|
// them out as metrics.
|
|
func (p *BlobPool) updateStorageMetrics() {
|
|
stats := p.store.Infos()
|
|
|
|
var (
|
|
dataused uint64
|
|
datareal uint64
|
|
slotused uint64
|
|
|
|
oversizedDataused uint64
|
|
oversizedDatagaps uint64
|
|
oversizedSlotused uint64
|
|
oversizedSlotgaps uint64
|
|
)
|
|
for _, shelf := range stats.Shelves {
|
|
slotDataused := shelf.FilledSlots * uint64(shelf.SlotSize)
|
|
slotDatagaps := shelf.GappedSlots * uint64(shelf.SlotSize)
|
|
|
|
dataused += slotDataused
|
|
datareal += slotDataused + slotDatagaps
|
|
slotused += shelf.FilledSlots
|
|
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(shelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(shelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(shelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(shelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
|
|
|
|
if shelf.SlotSize/blobSize > maxBlobsPerTransaction {
|
|
oversizedDataused += slotDataused
|
|
oversizedDatagaps += slotDatagaps
|
|
oversizedSlotused += shelf.FilledSlots
|
|
oversizedSlotgaps += shelf.GappedSlots
|
|
}
|
|
}
|
|
datausedGauge.Update(int64(dataused))
|
|
datarealGauge.Update(int64(datareal))
|
|
slotusedGauge.Update(int64(slotused))
|
|
|
|
oversizedDatausedGauge.Update(int64(oversizedDataused))
|
|
oversizedDatagapsGauge.Update(int64(oversizedDatagaps))
|
|
oversizedSlotusedGauge.Update(int64(oversizedSlotused))
|
|
oversizedSlotgapsGauge.Update(int64(oversizedSlotgaps))
|
|
|
|
p.updateLimboMetrics()
|
|
}
|
|
|
|
// updateLimboMetrics retrieves a bunch of stats from the limbo store and pushes
|
|
// them out as metrics.
|
|
func (p *BlobPool) updateLimboMetrics() {
|
|
stats := p.limbo.store.Infos()
|
|
|
|
var (
|
|
dataused uint64
|
|
datareal uint64
|
|
slotused uint64
|
|
)
|
|
for _, shelf := range stats.Shelves {
|
|
slotDataused := shelf.FilledSlots * uint64(shelf.SlotSize)
|
|
slotDatagaps := shelf.GappedSlots * uint64(shelf.SlotSize)
|
|
|
|
dataused += slotDataused
|
|
datareal += slotDataused + slotDatagaps
|
|
slotused += shelf.FilledSlots
|
|
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
|
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
|
|
}
|
|
limboDatausedGauge.Update(int64(dataused))
|
|
limboDatarealGauge.Update(int64(datareal))
|
|
limboSlotusedGauge.Update(int64(slotused))
|
|
}
|
|
|
|
// SubscribeTransactions registers a subscription for new transaction events,
|
|
// supporting feeding only newly seen or also resurrected transactions.
|
|
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
|
|
if reorgs {
|
|
return p.insertFeed.Subscribe(ch)
|
|
} else {
|
|
return p.discoverFeed.Subscribe(ch)
|
|
}
|
|
}
|
|
|
|
// Nonce returns the next nonce of an account, with all transactions executable
|
|
// by the pool already applied on top.
|
|
func (p *BlobPool) Nonce(addr common.Address) uint64 {
|
|
// We need a write lock here, since state.GetNonce might write the cache.
|
|
p.lock.Lock()
|
|
defer p.lock.Unlock()
|
|
|
|
if txs, ok := p.index[addr]; ok {
|
|
return txs[len(txs)-1].nonce + 1
|
|
}
|
|
return p.state.GetNonce(addr)
|
|
}
|
|
|
|
// Stats retrieves the current pool stats, namely the number of pending and the
|
|
// number of queued (non-executable) transactions.
|
|
func (p *BlobPool) Stats() (int, int) {
|
|
p.lock.RLock()
|
|
defer p.lock.RUnlock()
|
|
|
|
var pending int
|
|
for _, txs := range p.index {
|
|
pending += len(txs)
|
|
}
|
|
return pending, 0 // No non-executable txs in the blob pool
|
|
}
|
|
|
|
// Content retrieves the data content of the transaction pool, returning all the
|
|
// pending as well as queued transactions, grouped by account and sorted by nonce.
|
|
//
|
|
// For the blob pool, this method will return nothing for now.
|
|
// TODO(karalabe): Abstract out the returned metadata.
|
|
func (p *BlobPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
|
|
return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction)
|
|
}
|
|
|
|
// ContentFrom retrieves the data content of the transaction pool, returning the
|
|
// pending as well as queued transactions of this address, grouped by nonce.
|
|
//
|
|
// For the blob pool, this method will return nothing for now.
|
|
// TODO(karalabe): Abstract out the returned metadata.
|
|
func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
|
|
return []*types.Transaction{}, []*types.Transaction{}
|
|
}
|
|
|
|
// Locals retrieves the accounts currently considered local by the pool.
|
|
//
|
|
// There is no notion of local accounts in the blob pool.
|
|
func (p *BlobPool) Locals() []common.Address {
|
|
return []common.Address{}
|
|
}
|
|
|
|
// Status returns the known status (unknown/pending/queued) of a transaction
|
|
// identified by their hashes.
|
|
func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
|
|
if p.Has(hash) {
|
|
return txpool.TxStatusPending
|
|
}
|
|
return txpool.TxStatusUnknown
|
|
}
|