pool: add static info and simplify transfer

This commit is contained in:
emailtovamos 2024-08-01 14:36:47 +01:00
parent e30248be3a
commit f80ac01c95
2 changed files with 94 additions and 48 deletions

@ -56,7 +56,7 @@ const (
// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024
pool2Size = 10000 // todo might have to set it in config
pool2Size = 10000 // todo might have to set it in config // This is in slots and not in no of transactions
pool3Size = 50000
)
@ -894,7 +894,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Try to replace an existing transaction in the pending pool
if list := pool.pending[from]; list != nil && list.Contains(tx.Nonce()) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
inserted, old := list.Add(tx, pool.config.PriceBump, includePool2)
if !inserted {
addedToAnyPool, err := pool.addToPool2OrPool3(tx, from, false, includePool2, includePool3)
//pendingDiscardMeter.Mark(1) // todo do we need to mark the meter here?
@ -941,6 +941,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return replaced, nil
}
// addToPool2OrPool3 adds a transaction to pool1 or pool2 or pool3 depending on which one is asked for
func (pool *LegacyPool) addToPool2OrPool3(tx *types.Transaction, from common.Address, pool1, pool2, pool3 bool) (bool, error) {
if pool1 {
// todo (check) logic for pool1 related
@ -1008,7 +1009,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
if pool.queue[from] == nil {
pool.queue[from] = newList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump, false)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
@ -1062,7 +1063,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
inserted, old := list.Add(tx, pool.config.PriceBump, false) // todo check and confirm
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
@ -1511,18 +1512,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if len(nonStaticTxs) > 0 {
pool.txFeed.Send(core.NewTxsEvent{Txs: nonStaticTxs, Static: false})
}
//for _, set := range events {
// // todo problem is that here all transactions are sent at once. Maybe send them in series? But not sure how
// // performance will be affected
// // todo Maybe do the Send() only twice (instead of all events):
// // 1. For Static
// // 2. For non-Static
// pool.txFeed.Send(core.NewTxsEvent{Txs: set.Flatten(), Static: set.staticOnly})
// // todo what if pool1 and pool2 are full at this point. Should we include it to pool3?? Probably NO
// //txs = append(txs, set.Flatten()...)
//}
//pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
}
@ -1707,12 +1696,14 @@ func (pool *LegacyPool) truncatePending() {
return
}
// todo maybe check here if the extra numbers are within pool1 and (pool1+pool2) and in that case
// we can only broadcast to static peers.
pool1Size := pool.config.GlobalSlots
if (pending > pool1Size) && pending < (pool1Size+pool2Size) {
}
//var addToPool2OrPool3 bool
//
//// todo maybe check here if the extra numbers are within pool1 and (pool1+pool2) and in that case
//// we can only broadcast to static peers.
//pool1Size := pool.config.GlobalSlots
//if (pending > pool1Size) && pending < (pool1Size+pool2Size) {
// addToPool2OrPool3 = true
//}
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
@ -2169,7 +2160,7 @@ func (pool *LegacyPool) enqueueTxToCriticalPath(hash common.Hash, tx *types.Tran
if pool.criticalPathPool[from] == nil {
pool.criticalPathPool[from] = newList(false)
}
inserted, old := pool.criticalPathPool[from].Add(tx, pool.config.PriceBump)
inserted, old := pool.criticalPathPool[from].Add(tx, pool.config.PriceBump, false)
if !inserted {
queuedDiscardMeter.Mark(1)
return false, txpool.ErrReplaceUnderpriced
@ -2246,34 +2237,89 @@ func (pool *LegacyPool) startPeriodicTransfer() {
}()
}
// transferTransactions mainly moves from pool 3 to pool 2
func (pool *LegacyPool) transferTransactions() {
// todo (incomplete) here we check all the transfer logic. So
// if len(pool12) > maxPool1Size && len(pool12) < (maxPool1Size + maxPool2Size) then only broadcast to static peers.
// if len(pool12) > (maxPool1Size + maxPool2Size) then add a new transaction to pool3 only
maxPool1Size := pool.config.GlobalSlots + pool.config.GlobalQueue
maxPool1Pool2CombinedSize := maxPool1Size + pool2Size
extraSizePool2Pool1 := uint64(len(pool.pending)) + uint64(len(pool.queue)) - pool2Size - maxPool1Size
if extraSizePool2Pool1 <= 0 {
return
}
// Transfer transactions from LocalBufferPool (Pool 3) to CriticalPathPool (Pool 2)
if len(pool.criticalPathPool) < pool2Size {
txs := pool.localBufferPool.Flush(pool2Size - len(pool.criticalPathPool))
for _, tx := range txs {
_, err := pool.enqueueTxToCriticalPath(tx.Hash(), tx, false, true)
if err != nil {
log.Error("Failed to transfer transaction from Pool 3 to Pool 2", "err", err)
}
currentPool1Pool2Size := pool.all.Slots()
canTransferPool3ToPool2 := maxPool1Pool2CombinedSize > uint64(currentPool1Pool2Size)
if !canTransferPool3ToPool2 {
return
}
extraSlots := maxPool1Pool2CombinedSize - uint64(currentPool1Pool2Size)
extraTransactions := extraSlots / 4 // Since maximum slots per transaction is 4
// So now we can take out extraTransactions number of transactions from pool3 and put in pool2
if extraTransactions < 1 {
return
}
tx := pool.localBufferPool.Flush(int(extraTransactions))
if len(tx) == 0 {
return
}
for _, transaction := range tx {
if !(uint64(pool.all.Slots()) < maxPool1Pool2CombinedSize) {
break
}
from, _ := types.Sender(pool.signer, transaction)
// use addToPool2OrPool3() function to transfer from pool3 to pool2
_, err := pool.addToPool2OrPool3(transaction, from, false, true, false)
if err != nil {
return
}
}
// Transfer transactions from CriticalPathPool (Pool 2) to LegacyPool (Pool 1)
if uint64(len(pool.pending)) < maxPool1Size {
for addr, list := range pool.criticalPathPool {
for _, tx := range list.Flatten() {
_, err := pool.enqueueTx(tx.Hash(), tx, false, true)
if err != nil {
log.Error("Failed to transfer transaction from Pool 2 to Pool 1", "err", err)
}
}
delete(pool.criticalPathPool, addr)
}
}
// // todo logic: If len(pending + queue) or len(all) < (pool1Max+pool2Max) then we have room to put pool3 into pool1/pool2
// // In the above case, transfer from pool3 to pool2. Number of transferred transactions = (pool1Max+pool2Max) - len(all)
// // Pool3 to only pool2 can be transferred. So obviously those transactions are static true.
// // Pool2 to pool1 is unclear
//
// // todo flush from pool3 and if it never gets added to anything then add it back
// // otherwise determine exactly how many transctions we can accommodate and then flush accordingly
// //for _, transaction := range tx {
// txPoolSizeBeforeCurrentTx := uint64(pool.all.Slots())
// // todo keep extracting from pool3 until txPoolSizeBeforeCurrentTx + numSlots(extractedTransactions) >= (maxPool1Size + maxPool2Size)
// numSlotsExtractedTransactions := 0
// for txPoolSizeBeforeCurrentTx+uint64(numSlotsExtractedTransactions) < (maxPool1Size + uint64(pool2Size)) {
// tx := pool.localBufferPool.Flush(1)
// if len(tx) == 0 {
// break
// }
// numSlotsExtractedTransactions += numSlots(tx[0])
// }
// txPoolSizeAfterCurrentTx := uint64(pool.all.Slots() + numSlots(tx[0]))
// // pool2Size, pool3Size
// var includePool1, includePool2, includePool3 bool
// if txPoolSizeAfterCurrentTx < maxPool1Size {
// includePool1 = true
// } else if (txPoolSizeAfterCurrentTx > maxPool1Size) && (txPoolSizeAfterCurrentTx <= (maxPool1Size + pool2Size)) {
// includePool2 = true
// } else if txPoolSizeAfterCurrentTx > (maxPool1Size+pool2Size) && txPoolSizeAfterCurrentTx < (maxPool1Size+pool2Size+pool3Size) {
// includePool3 = true
// } else {
// return
// }
//
// // already validated by this point
// from, _ := types.Sender(pool.signer, tx[0])
//
// // use addToPool2OrPool3() function to transfer from pool3 to pool2
// _, err := pool.addToPool2OrPool3(tx[0], from, includePool1, includePool2, includePool3)
// if err != nil {
// return
// }
//}
//}
//}
// todo do things in terms of slots rather than number of transactions?
}

@ -328,7 +328,7 @@ func (l *list) Contains(nonce uint64) bool {
//
// If the new transaction is accepted into the list, the lists' cost and gas
// thresholds are also potentially updated.
func (l *list) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
func (l *list) Add(tx *types.Transaction, priceBump uint64, static bool) (bool, *types.Transaction) {
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
if old != nil {
@ -362,7 +362,7 @@ func (l *list) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transa
l.totalcost.Add(l.totalcost, cost)
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx, false) // todo putting false as a placeholder
l.txs.Put(tx, static) // todo putting false as a placeholder
if l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}