diff --git a/core/tx_list.go b/core/tx_list.go index 75bfdaedac..6b22cbbebe 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -494,11 +494,11 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. -func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions { - drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop +func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions { + drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep - for len(*l.items) > 0 && count > 0 { + for len(*l.items) > 0 && slots > 0 { // Discard stale transactions if found during cleanup tx := heap.Pop(l.items).(*types.Transaction) if l.all.Get(tx.Hash()) == nil { @@ -510,7 +510,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions save = append(save, tx) } else { drop = append(drop, tx) - count-- + slots -= numSlots(tx) } } for _, tx := range save { diff --git a/core/tx_pool.go b/core/tx_pool.go index f7032dbd1e..e95496b13b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -38,6 +38,18 @@ import ( const ( // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 + + // txSlotSize is used to calculate how many data slots a single transaction + // takes up based on its size. The slots are used as DoS protection, ensuring + // that validating a new transaction remains a constant operation (in reality + // O(maxslots), where max slots are 4 currently). + txSlotSize = 32 * 1024 + + // txMaxSize is the maximum size a single transaction can have. This field has + // non-trivial consequences: larger transactions are significantly harder and + // more expensive to propagate; larger transactions also take more resources + // to validate whether they fit into the pool or not. + txMaxSize = 4 * txSlotSize // 128KB, don't bump without chunking support ) var ( @@ -105,6 +117,7 @@ var ( pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) localGauge = metrics.NewRegisteredGauge("txpool/local", nil) + slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -510,8 +523,8 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { - // Heuristic limit, reject transactions over 32KB to prevent DOS attacks - if tx.Size() > 32*1024 { + // Reject transactions over defined size to prevent DOS attacks + if uint64(tx.Size()) > txMaxSize { return ErrOversizedData } // Transactions can't be negative. This may never happen using RLP decoded @@ -583,7 +596,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it - drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) + drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxMeter.Mark(1) @@ -1493,8 +1506,9 @@ func (as *accountSet) merge(other *accountSet) { // peeking into the pool in TxPool.Get without having to acquire the widely scoped // TxPool.mu mutex. type txLookup struct { - all map[common.Hash]*types.Transaction - lock sync.RWMutex + all map[common.Hash]*types.Transaction + slots int + lock sync.RWMutex } // newTxLookup returns a new txLookup structure. @@ -1532,11 +1546,22 @@ func (t *txLookup) Count() int { return len(t.all) } +// Slots returns the current number of slots used in the lookup. +func (t *txLookup) Slots() int { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.slots +} + // Add adds a transaction to the lookup. func (t *txLookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() + t.slots += numSlots(tx) + slotsGauge.Update(int64(t.slots)) + t.all[tx.Hash()] = tx } @@ -1545,5 +1570,13 @@ func (t *txLookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() + t.slots -= numSlots(t.all[hash]) + slotsGauge.Update(int64(t.slots)) + delete(t.all, hash) } + +// numSlots calculates the number of slots needed for a single transaction. +func numSlots(tx *types.Transaction) int { + return int((tx.Size() + txSlotSize - 1) / txSlotSize) +} diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 0f1e7ac8f8..4db3e6deef 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -77,9 +77,17 @@ func pricedTransaction(nonce uint64, gaslimit uint64, gasprice *big.Int, key *ec return tx } +func pricedDataTransaction(nonce uint64, gaslimit uint64, gasprice *big.Int, key *ecdsa.PrivateKey, bytes uint64) *types.Transaction { + data := make([]byte, bytes) + rand.Read(data) + + tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{}, big.NewInt(0), gaslimit, gasprice, data), types.HomesteadSigner{}, key) + return tx +} + func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)} key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) @@ -465,7 +473,7 @@ func TestTransactionDropping(t *testing.T) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000)) // Add some pending and some queued transactions @@ -674,7 +682,7 @@ func TestTransactionGapFilling(t *testing.T) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced @@ -728,7 +736,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep queuing up transactions and make sure all above a limit are dropped @@ -923,7 +931,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced @@ -1002,6 +1010,62 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } +// Test the limit on transaction size is enforced correctly. +// This test verifies every transaction having allowed size +// is added to the pool, and longer transactions are rejected. +func TestTransactionAllowedTxSize(t *testing.T) { + t.Parallel() + + // Create a test account and fund it + pool, key := setupTxPool() + defer pool.Stop() + + account := crypto.PubkeyToAddress(key.PublicKey) + pool.currentState.AddBalance(account, big.NewInt(1000000000)) + + // Compute maximal data size for transactions (lower bound). + // + // It is assumed the fields in the transaction (except of the data) are: + // - nonce <= 32 bytes + // - gasPrice <= 32 bytes + // - gasLimit <= 32 bytes + // - recipient == 20 bytes + // - value <= 32 bytes + // - signature == 65 bytes + // All those fields are summed up to at most 213 bytes. + baseSize := uint64(213) + dataSize := txMaxSize - baseSize + + // Try adding a transaction with maximal allowed size + tx := pricedDataTransaction(0, pool.currentMaxGas, big.NewInt(1), key, dataSize) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction of size %d, close to maximal: %v", int(tx.Size()), err) + } + // Try adding a transaction with random allowed size + if err := pool.addRemoteSync(pricedDataTransaction(1, pool.currentMaxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil { + t.Fatalf("failed to add transaction of random allowed size: %v", err) + } + // Try adding a transaction of minimal not allowed size + if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, txMaxSize)); err == nil { + t.Fatalf("expected rejection on slightly oversize transaction") + } + // Try adding a transaction of random not allowed size + if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(int(10*txMaxSize))))); err == nil { + t.Fatalf("expected rejection on oversize transaction") + } + // Run some sanity checks on the pool internals + pending, queued := pool.Stats() + if pending != 2 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + // Tests that if transactions start being capped, transactions are also removed from 'all' func TestTransactionCapClearsFromAll(t *testing.T) { t.Parallel() @@ -1752,6 +1816,24 @@ func TestTransactionStatusCheck(t *testing.T) { } } +// Test the transaction slots consumption is computed correctly +func TestTransactionSlotCount(t *testing.T) { + t.Parallel() + + key, _ := crypto.GenerateKey() + + // Check that an empty transaction consumes a single slot + smallTx := pricedDataTransaction(0, 0, big.NewInt(0), key, 0) + if slots := numSlots(smallTx); slots != 1 { + t.Fatalf("small transactions slot count mismatch: have %d want %d", slots, 1) + } + // Check that a large transaction consumes the correct number of slots + bigTx := pricedDataTransaction(0, 0, big.NewInt(0), key, uint64(10*txSlotSize)) + if slots := numSlots(bigTx); slots != 11 { + t.Fatalf("big transactions slot count mismatch: have %d want %d", slots, 11) + } +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } @@ -1763,7 +1845,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1788,7 +1870,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1812,7 +1894,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { pool, key := setupTxPool() defer pool.Stop() - account, _ := deriveSender(transaction(0, 0, key)) + account := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.AddBalance(account, big.NewInt(1000000)) batches := make([]types.Transactions, b.N)