pool: fastcache, interface, metrics modify

This commit is contained in:
emailtovamos 2024-09-11 15:57:06 +01:00
parent 0a5dbef9f4
commit 248bb6b0d6
6 changed files with 362 additions and 16 deletions

@ -49,6 +49,8 @@ func (lru *LRUBuffer) Add(tx *types.Transaction) {
elem := lru.buffer.PushFront(tx)
lru.index[tx.Hash()] = elem
lru.size += txSlots // Increase size by the slots of the new transaction
// Update pool3Gauge
pool3Gauge.Inc(1)
}
func (lru *LRUBuffer) Get(hash common.Hash) (*types.Transaction, bool) {
@ -76,6 +78,8 @@ func (lru *LRUBuffer) Flush(maxTransactions int) []*types.Transaction {
delete(lru.index, removedTx.Hash())
lru.size -= numSlots(removedTx) // Decrease size by the slots of the removed transaction
count++
// Update pool3Gauge
pool3Gauge.Dec(1)
}
return txs
}

@ -0,0 +1,187 @@
package legacypool
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type LRUBufferFastCache struct {
cache *fastcache.Cache
capacity int
mu sync.Mutex
size int // Total number of slots used
order []common.Hash // Tracks the order of added transactions for LRU eviction
}
// SerializeTransaction converts a transaction to a byte slice using RLP or any other encoding
func SerializeTransaction(tx *types.Transaction) []byte {
data, err := tx.MarshalBinary() // Ethereum uses RLP for transactions, so we can use this function
if err != nil {
return nil
}
return data
}
// DeserializeTransaction converts a byte slice back to a transaction
func DeserializeTransaction(data []byte) (*types.Transaction, error) {
var tx types.Transaction
err := tx.UnmarshalBinary(data)
if err != nil {
return nil, err
}
return &tx, nil
}
// LRUBufferFastCache initializes an LRU buffer with a given capacity
func NewLRUBufferFastCache(capacity int) *LRUBufferFastCache {
return &LRUBufferFastCache{
cache: fastcache.New(capacity * 1024 * 1024), // fastcache size is in bytes
capacity: capacity,
order: make([]common.Hash, 0),
}
}
func (lru *LRUBufferFastCache) Add(tx *types.Transaction) {
lru.mu.Lock()
defer lru.mu.Unlock()
txHash := tx.Hash()
// Check if the transaction already exists
if lru.cache.Has(txHash.Bytes()) {
// Move the transaction to the front in LRU order
lru.moveToFront(txHash)
return
}
txSlots := numSlots(tx)
// Evict the oldest transactions if the new one doesn't fit
for lru.size+txSlots > lru.capacity && len(lru.order) > 0 {
lru.evictOldest()
}
// Add the transaction to the cache
txData := SerializeTransaction(tx)
lru.cache.Set(txHash.Bytes(), txData)
lru.size += txSlots
// Update pool3Gauge
pool3Gauge.Inc(1)
// Add to the order tracking
lru.order = append(lru.order, txHash)
}
// Evict the oldest transaction in the LRU order
func (lru *LRUBufferFastCache) evictOldest() {
oldestHash := lru.order[0]
lru.order = lru.order[1:]
// Remove from the cache
txData := lru.cache.Get(nil, oldestHash.Bytes())
if len(txData) > 0 {
tx, err := DeserializeTransaction(txData)
if err == nil {
lru.size -= numSlots(tx)
}
}
// Remove the oldest entry
lru.cache.Del(oldestHash.Bytes())
// Update pool3Gauge
pool3Gauge.Dec(1)
}
// Move a transaction to the front of the LRU order
func (lru *LRUBufferFastCache) moveToFront(hash common.Hash) {
for i, h := range lru.order {
if h == hash {
// Remove the hash from its current position
lru.order = append(lru.order[:i], lru.order[i+1:]...)
break
}
}
// Add it to the front
lru.order = append(lru.order, hash)
}
// Get retrieves a transaction from the cache and moves it to the front of the LRU order
func (lru *LRUBufferFastCache) Get(hash common.Hash) (*types.Transaction, bool) {
lru.mu.Lock()
defer lru.mu.Unlock()
txData := lru.cache.Get(nil, hash.Bytes())
if len(txData) == 0 {
return nil, false
}
tx, err := DeserializeTransaction(txData)
if err != nil {
return nil, false
}
// Move the accessed transaction to the front in LRU order
lru.moveToFront(hash)
return tx, true
}
// Flush removes and returns up to `maxTransactions` transactions from the cache
func (lru *LRUBufferFastCache) Flush(maxTransactions int) []*types.Transaction {
lru.mu.Lock()
defer lru.mu.Unlock()
var txs []*types.Transaction
count := 0
// Remove up to maxTransactions transactions from the oldest
for count < maxTransactions && len(lru.order) > 0 {
oldestHash := lru.order[0]
lru.order = lru.order[1:]
txData := lru.cache.Get(nil, oldestHash.Bytes())
if len(txData) > 0 {
tx, err := DeserializeTransaction(txData)
if err == nil {
txs = append(txs, tx)
lru.size -= numSlots(tx)
count++
}
}
lru.cache.Del(oldestHash.Bytes())
// Update pool3Gauge
pool3Gauge.Dec(1)
}
return txs
}
// Size returns the current size of the buffer in terms of slots
func (lru *LRUBufferFastCache) Size() int {
lru.mu.Lock()
defer lru.mu.Unlock()
return lru.size
}
// PrintTxStats prints the hash, gas fee cap, and gas tip cap of all transactions
func (lru *LRUBufferFastCache) PrintTxStats() {
lru.mu.Lock()
defer lru.mu.Unlock()
for _, hash := range lru.order {
txData := lru.cache.Get(nil, hash.Bytes())
if len(txData) > 0 {
tx, err := DeserializeTransaction(txData)
if err == nil {
fmt.Println(tx.Hash().String(), tx.GasFeeCap().String(), tx.GasTipCap().String())
}
}
}
}

@ -0,0 +1,94 @@
package legacypool
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNewLRUBufferFastCache(t *testing.T) {
capacity := 10
lru := NewLRUBufferFastCache(capacity)
assert.Equal(t, capacity, lru.capacity, "capacity should match the given value")
assert.Equal(t, 0, lru.Size(), "size should be 0 for a new buffer")
assert.Equal(t, 0, len(lru.order), "order should be empty for a new buffer")
}
func TestAddAndGetFastCache(t *testing.T) {
lru := NewLRUBufferFastCache(10)
tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)
lru.Add(tx1)
lru.Add(tx2)
assert.Equal(t, 2, lru.Size(), "size should be 2 after adding two transactions")
retrievedTx, ok := lru.Get(tx1.Hash())
assert.True(t, ok, "tx1 should be found in the buffer")
assert.Equal(t, tx1.Hash(), retrievedTx.Hash(), "retrieved tx1 hash should match the original hash")
retrievedTx, ok = lru.Get(tx2.Hash())
assert.True(t, ok, "tx2 should be found in the buffer")
assert.Equal(t, tx2.Hash(), retrievedTx.Hash(), "retrieved tx2 hash should match the original hash")
}
func TestBufferCapacityFastCache(t *testing.T) {
lru := NewLRUBufferFastCache(2) // Capacity in slots
tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 1 slot
tx3 := createDummyTransaction(1000) // 1 slot
lru.Add(tx1)
lru.Add(tx2)
assert.Equal(t, 2, lru.Size(), "size should be 2 after adding two transactions")
lru.Add(tx3)
assert.Equal(t, 2, lru.Size(), "size should still be 2 after adding the third transaction")
_, ok := lru.Get(tx1.Hash())
assert.False(t, ok, "tx1 should have been evicted")
}
func TestFlushFastCache(t *testing.T) {
lru := NewLRUBufferFastCache(10)
tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)
tx3 := createDummyTransaction(1000)
lru.Add(tx1)
lru.Add(tx2)
lru.Add(tx3)
lru.PrintTxStats()
flushedTxs := lru.Flush(2)
assert.Equal(t, 2, len(flushedTxs), "should flush 2 transactions")
assert.Equal(t, tx1.Hash().String(), flushedTxs[0].Hash().String(), "correct flushed transaction")
assert.Equal(t, tx2.Hash().String(), flushedTxs[1].Hash().String(), "correct flushed transaction")
assert.Equal(t, 1, lru.Size(), "size should be 1 after flushing 2 transactions")
lru.PrintTxStats()
}
func TestSizeFastCache(t *testing.T) {
lru := NewLRUBufferFastCache(10)
tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 1 slot
lru.Add(tx1)
assert.Equal(t, 1, lru.Size(), "size should be 1 after adding tx1")
lru.Add(tx2)
assert.Equal(t, 2, lru.Size(), "size should be 2 after adding tx2")
lru.Flush(1)
assert.Equal(t, 1, lru.Size(), "size should be 1 after flushing one transaction")
}

@ -248,7 +248,7 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
localBufferPool *LRUBuffer // Local buffer transactions (Pool 3)
localBufferPool Pool3 // Local buffer transactions (Pool 3)
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
@ -293,7 +293,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
localBufferPool: NewLRUBuffer(int(maxPool3Size)),
localBufferPool: NewLRUBufferFastCache(int(maxPool3Size)),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@ -1001,7 +1001,6 @@ func (pool *LegacyPool) addToPool12OrPool3(tx *types.Transaction, from common.Ad
}
if pool3 {
pool.localBufferPool.Add(tx)
pool3Gauge.Inc(1)
return true, nil
}
return false, errors.New("could not add to any pool")
@ -1686,6 +1685,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(readies)))
}
// Drop all transactions over the allowed limit
var caps types.Transactions
@ -1701,6 +1703,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if list.txs.staticOnly {
pool2Gauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
@ -2216,7 +2221,7 @@ func (pool *LegacyPool) transferTransactions() {
return
}
extraSlots := maxPool1Pool2CombinedSize - currentPool1Pool2Size
extraTransactions := extraSlots / 4 // Since maximum slots per transaction is 4
extraTransactions := (extraSlots + 3) / 4 // Since maximum slots per transaction is 4
// So now we can take out extraTransactions number of transactions from pool3 and put in pool2
if extraTransactions < 1 {
return
@ -2240,8 +2245,11 @@ func (pool *LegacyPool) transferTransactions() {
if err != nil {
// if it never gets added to anything then add it back
pool.addToPool12OrPool3(transaction, from, true, false, false, true)
//slots := int64(numSlots(transaction))
pool2Gauge.Dec(1)
continue
}
pool2Gauge.Inc(1)
log.Debug("Transferred from pool3 to pool2", "transactions", transaction.Hash().String())
}
}

@ -1926,6 +1926,7 @@ func TestStableUnderpricing(t *testing.T) {
// Note, local transactions are never allowed to be dropped.
func TestUnderpricingDynamicFee(t *testing.T) {
t.Parallel()
testTxPoolConfig.InterPoolTransferTime = 5 * time.Second
pool, _ := setupPoolWithConfig(eip1559Config)
defer pool.Close()
@ -2090,7 +2091,7 @@ func TestDualHeapEviction(t *testing.T) {
}
pending, queued := pool.Stats()
if pending+queued != 5 {
t.Fatalf("transaction count mismatch: have %d, want %d, pending %d, queued %d, pool3 %d", pending+queued, 5, pending, queued, pool.localBufferPool.size)
t.Fatalf("transaction count mismatch: have %d, want %d, pending %d, queued %d, pool3 %d", pending+queued, 5, pending, queued, pool.localBufferPool.Size())
}
}
@ -2261,7 +2262,7 @@ func TestTransferTransactions(t *testing.T) {
pool, _ := setupPoolWithConfig(eip1559Config)
defer pool.Close()
pool.config.GlobalSlots = 2
pool.config.GlobalSlots = 1
pool.config.GlobalQueue = 1
pool.config.Pool2Slots = 1
@ -2269,7 +2270,7 @@ func TestTransferTransactions(t *testing.T) {
pool.config.InterPoolTransferTime = 5 * time.Second
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 4)
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
@ -2278,34 +2279,71 @@ func TestTransferTransactions(t *testing.T) {
tx := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, true, false, false, true)
time.Sleep(10 * time.Second)
time.Sleep(6 * time.Second)
pending, queue := pool.Stats()
if pending != 0 {
if pending != 1 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
tx2 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[1])
from2, _ := types.Sender(pool.signer, tx2)
pool.addToPool12OrPool3(tx2, from2, true, false, false, true)
time.Sleep(2 * time.Second)
time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 0 {
if pending != 2 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
time.Sleep(3 * time.Second)
tx3 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[2])
from3, _ := types.Sender(pool.signer, tx3)
pool.addToPool12OrPool3(tx3, from3, true, false, false, true)
time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 0 {
if pending != 3 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 1 {
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
tx4 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[3])
from4, _ := types.Sender(pool.signer, tx4)
pool.addToPool12OrPool3(tx4, from4, true, false, false, true)
time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 3 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
bufferSize := pool.localBufferPool.Size()
if bufferSize != 1 {
t.Errorf("buffer transactions mismatched: have %d, want %d", bufferSize, 1)
}
tx5 := dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[4])
from5, _ := types.Sender(pool.signer, tx5)
pool.addToPool12OrPool3(tx5, from5, true, false, false, true)
time.Sleep(6 * time.Second)
pending, queue = pool.Stats()
if pending != 3 {
t.Errorf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queue != 0 {
t.Errorf("queued transactions mismatched: have %d, want %d", queue, 1)
}
bufferSize = pool.localBufferPool.Size()
if bufferSize != 2 {
t.Errorf("buffer transactions mismatched: have %d, want %d", bufferSize, 1)
}
}
// Tests that the pool rejects replacement dynamic fee transactions that don't

@ -0,0 +1,15 @@
package legacypool
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// Pool3 is an interface representing a transaction buffer
type Pool3 interface {
Add(tx *types.Transaction) // Adds a transaction to the buffer
Get(hash common.Hash) (*types.Transaction, bool) // Retrieves a transaction by hash
Flush(maxTransactions int) []*types.Transaction // Flushes up to maxTransactions transactions
Size() int // Returns the current size of the buffer
PrintTxStats() // Prints the statistics of all transactions in the buffer
}