Compare commits
4 Commits
master
...
no-discard
Author | SHA1 | Date | |
---|---|---|---|
|
405c236884 | ||
|
6778d297b5 | ||
|
eec121633c | ||
|
4b262b78f0 |
@ -55,6 +55,8 @@ const (
|
||||
|
||||
// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
|
||||
txReannoMaxNum = 1024
|
||||
|
||||
maxBufferSize = 1000 // maximum size of tx buffer
|
||||
)
|
||||
|
||||
var (
|
||||
@ -244,6 +246,10 @@ type LegacyPool struct {
|
||||
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
|
||||
|
||||
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
|
||||
|
||||
// A buffer to store transactions that would otherwise be discarded
|
||||
buffer []*types.Transaction
|
||||
bufferLock sync.Mutex
|
||||
}
|
||||
|
||||
type txpoolResetRequest struct {
|
||||
@ -355,11 +361,13 @@ func (pool *LegacyPool) loop() {
|
||||
evict = time.NewTicker(evictionInterval)
|
||||
reannounce = time.NewTicker(reannounceInterval)
|
||||
journal = time.NewTicker(pool.config.Rejournal)
|
||||
readd = time.NewTicker(time.Minute) // ticker to re-add buffered transactions periodically
|
||||
)
|
||||
defer report.Stop()
|
||||
defer evict.Stop()
|
||||
defer reannounce.Stop()
|
||||
defer journal.Stop()
|
||||
defer readd.Stop() // Stop the ticker when the loop exits
|
||||
|
||||
// Notify tests that the init phase is done
|
||||
close(pool.initDoneCh)
|
||||
@ -436,6 +444,9 @@ func (pool *LegacyPool) loop() {
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
}
|
||||
// Handle re-adding buffered transactions
|
||||
case <-readd.C:
|
||||
pool.readdBufferedTransactions()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -781,12 +792,21 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
||||
}
|
||||
}()
|
||||
}
|
||||
// If the transaction pool is full, discard underpriced transactions
|
||||
// If the transaction pool is full, buffer underpriced transactions
|
||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
// If the new transaction is underpriced, don't accept it
|
||||
// If the new transaction is underpriced, buffer it
|
||||
if !isLocal && pool.priced.Underpriced(tx) {
|
||||
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
|
||||
log.Trace("Buffering underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
|
||||
underpricedTxMeter.Mark(1)
|
||||
|
||||
pool.bufferLock.Lock()
|
||||
if len(pool.buffer) < maxBufferSize {
|
||||
pool.buffer = append(pool.buffer, tx)
|
||||
} else {
|
||||
log.Warn("Buffer is full, discarding transaction", "hash", hash)
|
||||
}
|
||||
pool.bufferLock.Unlock()
|
||||
|
||||
return false, txpool.ErrUnderpriced
|
||||
}
|
||||
|
||||
@ -804,6 +824,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
||||
// Otherwise if we can't make enough room for new one, abort the operation.
|
||||
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
|
||||
|
||||
// Add dropped transactions to the buffer
|
||||
pool.bufferLock.Lock()
|
||||
availableSpace := maxBufferSize - len(pool.buffer)
|
||||
// Determine how many elements to take from drop
|
||||
if availableSpace > len(drop) {
|
||||
availableSpace = len(drop)
|
||||
}
|
||||
pool.buffer = append(pool.buffer, drop[:availableSpace]...)
|
||||
pool.bufferLock.Unlock()
|
||||
|
||||
// Special case, we still can't make the room for the new remote one.
|
||||
if !isLocal && !success {
|
||||
log.Trace("Discarding overflown transaction", "hash", hash)
|
||||
@ -1779,6 +1809,51 @@ func (pool *LegacyPool) SetMaxGas(maxGas uint64) {
|
||||
pool.maxGas.Store(maxGas)
|
||||
}
|
||||
|
||||
func (pool *LegacyPool) readdBufferedTransactions() {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
// Check if there is space in the pool
|
||||
if uint64(pool.all.Slots()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
return // No space available, skip re-adding
|
||||
}
|
||||
|
||||
var readded []*types.Transaction
|
||||
|
||||
pool.bufferLock.Lock()
|
||||
for _, tx := range pool.buffer {
|
||||
// Check if adding this transaction will exceed the pool capacity
|
||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
break // Stop if adding the transaction will exceed the pool capacity
|
||||
}
|
||||
|
||||
if _, err := pool.add(tx, false); err == nil {
|
||||
readded = append(readded, tx)
|
||||
}
|
||||
}
|
||||
pool.bufferLock.Unlock()
|
||||
|
||||
// Remove successfully re-added transactions from the buffer
|
||||
if len(readded) > 0 {
|
||||
remaining := pool.buffer[:0]
|
||||
for _, tx := range pool.buffer {
|
||||
if !containsTransaction(readded, tx) {
|
||||
remaining = append(remaining, tx)
|
||||
}
|
||||
}
|
||||
pool.buffer = remaining
|
||||
}
|
||||
}
|
||||
|
||||
func containsTransaction(txs []*types.Transaction, tx *types.Transaction) bool {
|
||||
for _, t := range txs {
|
||||
if t.Hash() == tx.Hash() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
||||
type addressByHeartbeat struct {
|
||||
address common.Address
|
||||
|
Loading…
Reference in New Issue
Block a user