diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2e87bb8200..4b86382bdb 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -72,6 +72,7 @@ var ( utils.EthashDatasetDirFlag, utils.EthashDatasetsInMemoryFlag, utils.EthashDatasetsOnDiskFlag, + utils.TxPoolLocalsFlag, utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, utils.TxPoolRejournalFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 674c5d9015..1e27d0ae8d 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -114,6 +114,7 @@ var AppHelpFlagGroups = []flagGroup{ { Name: "TRANSACTION POOL", Flags: []cli.Flag{ + utils.TxPoolLocalsFlag, utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, utils.TxPoolRejournalFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7317655836..cfca7b4ab4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -233,6 +233,10 @@ var ( Value: eth.DefaultConfig.Ethash.DatasetsOnDisk, } // Transaction pool settings + TxPoolLocalsFlag = cli.StringFlag{ + Name: "txpool.locals", + Usage: "Comma separated accounts to treat as locals (no flush, priority inclusion)", + } TxPoolNoLocalsFlag = cli.BoolFlag{ Name: "txpool.nolocals", Usage: "Disables price exemptions for locally submitted transactions", @@ -977,6 +981,16 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config) { } func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { + if ctx.GlobalIsSet(TxPoolLocalsFlag.Name) { + locals := strings.Split(ctx.GlobalString(TxPoolLocalsFlag.Name), ",") + for _, account := range locals { + if trimmed := strings.TrimSpace(account); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --txpool.locals: %s", trimmed) + } else { + cfg.Locals = append(cfg.Locals, common.HexToAddress(account)) + } + } + } if ctx.GlobalIsSet(TxPoolNoLocalsFlag.Name) { cfg.NoLocals = ctx.GlobalBool(TxPoolNoLocalsFlag.Name) } diff --git a/core/tx_pool.go b/core/tx_pool.go index 7007f85ddf..46ae2759bc 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -123,9 +123,10 @@ type blockChain interface { // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { - NoLocals bool // Whether local transaction handling should be disabled - Journal string // Journal of local transactions to survive node restarts - Rejournal time.Duration // Time interval to regenerate the local transaction journal + Locals []common.Address // Addresses that should be treated by default as local + NoLocals bool // Whether local transaction handling should be disabled + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -231,6 +232,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) + for _, addr := range config.Locals { + log.Info("Setting new local account", "address", addr) + pool.locals.add(addr) + } pool.priced = newTxPricedList(pool.all) pool.reset(nil, chain.CurrentBlock().Header()) @@ -534,6 +539,14 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } +// Locals retrieves the accounts currently considered local by the pool. +func (pool *TxPool) Locals() []common.Address { + pool.mu.Lock() + defer pool.mu.Unlock() + + return pool.locals.flatten() +} + // local retrieves all currently known local transactions, groupped by origin // account and sorted by nonce. The returned transaction set is a copy and can be // freely modified by calling code. @@ -665,7 +678,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { } // Mark local addresses and journal local transactions if local { - pool.locals.add(from) + if !pool.locals.contains(from) { + log.Info("Setting new local account", "address", from) + pool.locals.add(from) + } } pool.journalTx(from, tx) @@ -1138,6 +1154,7 @@ func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } type accountSet struct { accounts map[common.Address]struct{} signer types.Signer + cache *[]common.Address } // newAccountSet creates a new address set with an associated signer for sender @@ -1167,6 +1184,20 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool { // add inserts a new address into the set to track. func (as *accountSet) add(addr common.Address) { as.accounts[addr] = struct{}{} + as.cache = nil +} + +// flatten returns the list of addresses within this set, also caching it for later +// reuse. The returned slice should not be changed! +func (as *accountSet) flatten() []common.Address { + if as.cache == nil { + accounts := make([]common.Address, 0, len(as.accounts)) + for account := range as.accounts { + accounts = append(accounts, account) + } + as.cache = &accounts + } + return *as.cache } // txLookup is used internally by TxPool to track transactions while allowing lookup without diff --git a/miner/worker.go b/miner/worker.go index c299ff9dc1..8c3337ba45 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -877,11 +877,26 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool) { w.updateSnapshot() return } - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending + for _, account := range w.eth.TxPool().Locals() { + if txs := remoteTxs[account]; len(txs) > 0 { + delete(remoteTxs, account) + localTxs[account] = txs + } + } + if len(localTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) + if w.commitTransactions(txs, w.coinbase, interrupt) { + return + } + } + if len(remoteTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) + if w.commitTransactions(txs, w.coinbase, interrupt) { + return + } } - w.commit(uncles, w.fullTaskHook, true, tstart) }