Compare commits
4 Commits
develop
...
no-discard
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
405c236884 | ||
|
|
6778d297b5 | ||
|
|
eec121633c | ||
|
|
4b262b78f0 |
@@ -55,6 +55,8 @@ const (
|
|||||||
|
|
||||||
// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
|
// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
|
||||||
txReannoMaxNum = 1024
|
txReannoMaxNum = 1024
|
||||||
|
|
||||||
|
maxBufferSize = 1000 // maximum size of tx buffer
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -244,6 +246,10 @@ type LegacyPool struct {
|
|||||||
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
|
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
|
||||||
|
|
||||||
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
|
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
|
||||||
|
|
||||||
|
// A buffer to store transactions that would otherwise be discarded
|
||||||
|
buffer []*types.Transaction
|
||||||
|
bufferLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type txpoolResetRequest struct {
|
type txpoolResetRequest struct {
|
||||||
@@ -355,11 +361,13 @@ func (pool *LegacyPool) loop() {
|
|||||||
evict = time.NewTicker(evictionInterval)
|
evict = time.NewTicker(evictionInterval)
|
||||||
reannounce = time.NewTicker(reannounceInterval)
|
reannounce = time.NewTicker(reannounceInterval)
|
||||||
journal = time.NewTicker(pool.config.Rejournal)
|
journal = time.NewTicker(pool.config.Rejournal)
|
||||||
|
readd = time.NewTicker(time.Minute) // ticker to re-add buffered transactions periodically
|
||||||
)
|
)
|
||||||
defer report.Stop()
|
defer report.Stop()
|
||||||
defer evict.Stop()
|
defer evict.Stop()
|
||||||
defer reannounce.Stop()
|
defer reannounce.Stop()
|
||||||
defer journal.Stop()
|
defer journal.Stop()
|
||||||
|
defer readd.Stop() // Stop the ticker when the loop exits
|
||||||
|
|
||||||
// Notify tests that the init phase is done
|
// Notify tests that the init phase is done
|
||||||
close(pool.initDoneCh)
|
close(pool.initDoneCh)
|
||||||
@@ -436,6 +444,9 @@ func (pool *LegacyPool) loop() {
|
|||||||
}
|
}
|
||||||
pool.mu.Unlock()
|
pool.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
// Handle re-adding buffered transactions
|
||||||
|
case <-readd.C:
|
||||||
|
pool.readdBufferedTransactions()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -781,12 +792,21 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
// If the transaction pool is full, discard underpriced transactions
|
// If the transaction pool is full, buffer underpriced transactions
|
||||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||||
// If the new transaction is underpriced, don't accept it
|
// If the new transaction is underpriced, buffer it
|
||||||
if !isLocal && pool.priced.Underpriced(tx) {
|
if !isLocal && pool.priced.Underpriced(tx) {
|
||||||
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
|
log.Trace("Buffering underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
|
||||||
underpricedTxMeter.Mark(1)
|
underpricedTxMeter.Mark(1)
|
||||||
|
|
||||||
|
pool.bufferLock.Lock()
|
||||||
|
if len(pool.buffer) < maxBufferSize {
|
||||||
|
pool.buffer = append(pool.buffer, tx)
|
||||||
|
} else {
|
||||||
|
log.Warn("Buffer is full, discarding transaction", "hash", hash)
|
||||||
|
}
|
||||||
|
pool.bufferLock.Unlock()
|
||||||
|
|
||||||
return false, txpool.ErrUnderpriced
|
return false, txpool.ErrUnderpriced
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -804,6 +824,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||||||
// Otherwise if we can't make enough room for new one, abort the operation.
|
// Otherwise if we can't make enough room for new one, abort the operation.
|
||||||
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
|
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
|
||||||
|
|
||||||
|
// Add dropped transactions to the buffer
|
||||||
|
pool.bufferLock.Lock()
|
||||||
|
availableSpace := maxBufferSize - len(pool.buffer)
|
||||||
|
// Determine how many elements to take from drop
|
||||||
|
if availableSpace > len(drop) {
|
||||||
|
availableSpace = len(drop)
|
||||||
|
}
|
||||||
|
pool.buffer = append(pool.buffer, drop[:availableSpace]...)
|
||||||
|
pool.bufferLock.Unlock()
|
||||||
|
|
||||||
// Special case, we still can't make the room for the new remote one.
|
// Special case, we still can't make the room for the new remote one.
|
||||||
if !isLocal && !success {
|
if !isLocal && !success {
|
||||||
log.Trace("Discarding overflown transaction", "hash", hash)
|
log.Trace("Discarding overflown transaction", "hash", hash)
|
||||||
@@ -1779,6 +1809,51 @@ func (pool *LegacyPool) SetMaxGas(maxGas uint64) {
|
|||||||
pool.maxGas.Store(maxGas)
|
pool.maxGas.Store(maxGas)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pool *LegacyPool) readdBufferedTransactions() {
|
||||||
|
pool.mu.Lock()
|
||||||
|
defer pool.mu.Unlock()
|
||||||
|
|
||||||
|
// Check if there is space in the pool
|
||||||
|
if uint64(pool.all.Slots()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||||
|
return // No space available, skip re-adding
|
||||||
|
}
|
||||||
|
|
||||||
|
var readded []*types.Transaction
|
||||||
|
|
||||||
|
pool.bufferLock.Lock()
|
||||||
|
for _, tx := range pool.buffer {
|
||||||
|
// Check if adding this transaction will exceed the pool capacity
|
||||||
|
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||||
|
break // Stop if adding the transaction will exceed the pool capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := pool.add(tx, false); err == nil {
|
||||||
|
readded = append(readded, tx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.bufferLock.Unlock()
|
||||||
|
|
||||||
|
// Remove successfully re-added transactions from the buffer
|
||||||
|
if len(readded) > 0 {
|
||||||
|
remaining := pool.buffer[:0]
|
||||||
|
for _, tx := range pool.buffer {
|
||||||
|
if !containsTransaction(readded, tx) {
|
||||||
|
remaining = append(remaining, tx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.buffer = remaining
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsTransaction(txs []*types.Transaction, tx *types.Transaction) bool {
|
||||||
|
for _, t := range txs {
|
||||||
|
if t.Hash() == tx.Hash() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
||||||
type addressByHeartbeat struct {
|
type addressByHeartbeat struct {
|
||||||
address common.Address
|
address common.Address
|
||||||
|
|||||||
Reference in New Issue
Block a user