Merge pull request #19692 from karalabe/metrics-extensions

core, ethdb, metrics, p2p: expose various counter metrics for grafana
This commit is contained in:
Péter Szilágyi 2019-06-11 10:07:41 +03:00 committed by GitHub
commit 3d3e83ecff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 341 additions and 117 deletions

@ -46,6 +46,10 @@ import (
) )
var ( var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil) accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil) accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil) accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
@ -332,6 +336,7 @@ func (bc *BlockChain) loadLastState() error {
} }
// Everything seems to be fine, set as the head block // Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock) bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
// Restore the last known head header // Restore the last known head header
currentHeader := currentBlock.Header() currentHeader := currentBlock.Header()
@ -344,12 +349,14 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block // Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock) bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil { if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block) bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
} }
} }
// Issue a status log for the user // Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock() currentFastBlock := bc.CurrentFastBlock()
@ -388,6 +395,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
} }
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
bc.currentBlock.Store(newHeadBlock) bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
} }
// Rewind the fast block in a simpleton way to the target head // Rewind the fast block in a simpleton way to the target head
@ -399,6 +407,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
} }
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
bc.currentFastBlock.Store(newHeadFastBlock) bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
} }
} }
@ -450,6 +459,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// If all checks out, manually set the head block // If all checks out, manually set the head block
bc.chainmu.Lock() bc.chainmu.Lock()
bc.currentBlock.Store(block) bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock() bc.chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash) log.Info("Committed new head block", "number", block.Number(), "hash", hash)
@ -522,9 +532,12 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
bc.genesisBlock = genesis bc.genesisBlock = genesis
bc.insert(bc.genesisBlock) bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock) bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock) bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil return nil
} }
@ -598,6 +611,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadBlockHash(bc.db, block.Hash()) rawdb.WriteHeadBlockHash(bc.db, block.Hash())
bc.currentBlock.Store(block) bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
// If the block is better than our head or is on a different chain, force update heads // If the block is better than our head or is on a different chain, force update heads
if updateHeads { if updateHeads {
@ -605,6 +619,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
bc.currentFastBlock.Store(block) bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
} }
} }
@ -862,11 +877,13 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
bc.currentFastBlock.Store(newFastBlock) bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
} }
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
bc.currentBlock.Store(newBlock) bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
} }
} }
// Truncate ancient data which exceeds the current header. // Truncate ancient data which exceeds the current header.
@ -952,6 +969,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head) bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
isCanonical = true isCanonical = true
} }
} }

@ -104,6 +104,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
} }
} }
hc.currentHeaderHash = hc.CurrentHeader().Hash() hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
return hc, nil return hc, nil
} }
@ -185,12 +186,12 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
hc.currentHeaderHash = hash hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header)) hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())
status = CanonStatTy status = CanonStatTy
} else { } else {
status = SideStatTy status = SideStatTy
} }
hc.headerCache.Add(hash, header) hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number) hc.numberCache.Add(hash, number)
@ -456,6 +457,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
hc.currentHeader.Store(head) hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash() hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
} }
type ( type (
@ -508,6 +510,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
hc.currentHeader.Store(parent) hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
} }
batch.Write() batch.Write()

@ -80,8 +80,9 @@ type freezer struct {
func newFreezer(datadir string, namespace string) (*freezer, error) { func newFreezer(datadir string, namespace string) (*freezer, error) {
// Create the initial freezer object // Create the initial freezer object
var ( var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil)
) )
// Ensure the datadir is not a symbolic link if it exists. // Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) { if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
@ -102,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
instanceLock: lock, instanceLock: lock,
} }
for name, disableSnappy := range freezerNoSnappy { for name, disableSnappy := range freezerNoSnappy {
table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy) table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy)
if err != nil { if err != nil {
for _, table := range freezer.tables { for _, table := range freezer.tables {
table.Close() table.Close()

@ -94,17 +94,18 @@ type freezerTable struct {
// to count how many historic items have gone missing. // to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items) itemOffset uint32 // Offset (number of discarded items)
headBytes uint32 // Number of bytes written to the head file headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables
logger log.Logger // Logger with database path and table name ambedded logger log.Logger // Logger with database path and table name ambedded
lock sync.RWMutex // Mutex protecting the data file descriptors lock sync.RWMutex // Mutex protecting the data file descriptors
} }
// newTable opens a freezer table with default settings - 2G files // newTable opens a freezer table with default settings - 2G files
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) { func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy)
} }
// openFreezerFileForAppend opens a freezer table file and seeks to the end // openFreezerFileForAppend opens a freezer table file and seeks to the end
@ -148,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error {
// newCustomTable opens a freezer table, creating the data and index files if they are // newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure // non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync. // they don't go out of sync.
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file // Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil { if err := os.MkdirAll(path, 0755); err != nil {
return nil, err return nil, err
@ -171,6 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
files: make(map[uint32]*os.File), files: make(map[uint32]*os.File),
readMeter: readMeter, readMeter: readMeter,
writeMeter: writeMeter, writeMeter: writeMeter,
sizeCounter: sizeCounter,
name: name, name: name,
path: path, path: path,
logger: log.New("database", path, "table", name), logger: log.New("database", path, "table", name),
@ -181,6 +183,14 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
tab.Close() tab.Close()
return nil, err return nil, err
} }
// Initialize the starting size counter
size, err := tab.sizeNolock()
if err != nil {
tab.Close()
return nil, err
}
tab.sizeCounter.Inc(int64(size))
return tab, nil return tab, nil
} }
@ -321,6 +331,11 @@ func (t *freezerTable) truncate(items uint64) error {
if atomic.LoadUint64(&t.items) <= items { if atomic.LoadUint64(&t.items) <= items {
return nil return nil
} }
// We need to truncate, save the old size for metrics tracking
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Something's out of sync, truncate the table's offset index // Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
@ -355,6 +370,14 @@ func (t *freezerTable) truncate(items uint64) error {
// All data files truncated, set internal counters and return // All data files truncated, set internal counters and return
atomic.StoreUint64(&t.items, items) atomic.StoreUint64(&t.items, items)
atomic.StoreUint32(&t.headBytes, expected.offset) atomic.StoreUint32(&t.headBytes, expected.offset)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeCounter.Dec(int64(oldSize - newSize))
return nil return nil
} }
@ -483,7 +506,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
} }
// Write indexEntry // Write indexEntry
t.index.Write(idx.marshallBinary()) t.index.Write(idx.marshallBinary())
t.writeMeter.Mark(int64(bLen + indexEntrySize)) t.writeMeter.Mark(int64(bLen + indexEntrySize))
t.sizeCounter.Inc(int64(bLen + indexEntrySize))
atomic.AddUint64(&t.items, 1) atomic.AddUint64(&t.items, 1)
return nil return nil
} }
@ -562,6 +588,12 @@ func (t *freezerTable) size() (uint64, error) {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
return t.sizeNolock()
}
// sizeNolock returns the total data size in the freezer table without obtaining
// the mutex first.
func (t *freezerTable) sizeNolock() (uint64, error) {
stat, err := t.index.Stat() stat, err := t.index.Stat()
if err != nil { if err != nil {
return 0, err return 0, err

@ -56,7 +56,7 @@ func TestFreezerBasics(t *testing.T) {
// set cutoff at 50 bytes // set cutoff at 50 bytes
f, err := newCustomTable(os.TempDir(), f, err := newCustomTable(os.TempDir(),
fmt.Sprintf("unittest-%d", rand.Uint64()), fmt.Sprintf("unittest-%d", rand.Uint64()),
metrics.NewMeter(), metrics.NewMeter(), 50, true) metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter(), 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -98,12 +98,12 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Parallel() t.Parallel()
// set cutoff at 50 bytes // set cutoff at 50 bytes
var ( var (
fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
m1, m2 = metrics.NewMeter(), metrics.NewMeter() rm, wm, sc = metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
f *freezerTable f *freezerTable
err error err error
) )
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -112,7 +112,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
data := getChunk(15, x) data := getChunk(15, x)
f.Append(uint64(x), data) f.Append(uint64(x), data)
f.Close() f.Close()
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
} }
defer f.Close() defer f.Close()
@ -126,7 +126,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
} }
f.Close() f.Close()
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -136,11 +136,11 @@ func TestFreezerBasicsClosing(t *testing.T) {
// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed // TestFreezerRepairDanglingHead tests that we can recover if index entries are removed
func TestFreezerRepairDanglingHead(t *testing.T) { func TestFreezerRepairDanglingHead(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill table { // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -169,7 +169,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
idxFile.Close() idxFile.Close()
// Now open it again // Now open it again
{ {
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The last item should be missing // The last item should be missing
if _, err = f.Retrieve(0xff); err == nil { if _, err = f.Retrieve(0xff); err == nil {
t.Errorf("Expected error for missing index entry") t.Errorf("Expected error for missing index entry")
@ -184,11 +184,11 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed // TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed
func TestFreezerRepairDanglingHeadLarge(t *testing.T) { func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill a table and close it { // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -216,7 +216,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
idxFile.Close() idxFile.Close()
// Now open it again // Now open it again
{ {
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The first item should be there // The first item should be there
if _, err = f.Retrieve(0); err != nil { if _, err = f.Retrieve(0); err != nil {
t.Fatal(err) t.Fatal(err)
@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
} }
// And if we open it, we should now be able to read all of them (new values) // And if we open it, we should now be able to read all of them (new values)
{ {
f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
for y := 1; y < 255; y++ { for y := 1; y < 255; y++ {
exp := getChunk(15, ^y) exp := getChunk(15, ^y)
got, err := f.Retrieve(uint64(y)) got, err := f.Retrieve(uint64(y))
@ -251,11 +251,11 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
// TestSnappyDetection tests that we fail to open a snappy database and vice versa // TestSnappyDetection tests that we fail to open a snappy database and vice versa
func TestSnappyDetection(t *testing.T) { func TestSnappyDetection(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
// Open with snappy // Open with snappy
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -268,7 +268,7 @@ func TestSnappyDetection(t *testing.T) {
} }
// Open without snappy // Open without snappy
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, false)
if _, err = f.Retrieve(0); err == nil { if _, err = f.Retrieve(0); err == nil {
f.Close() f.Close()
t.Fatalf("expected empty table") t.Fatalf("expected empty table")
@ -277,7 +277,7 @@ func TestSnappyDetection(t *testing.T) {
// Open with snappy // Open with snappy
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// There should be 255 items // There should be 255 items
if _, err = f.Retrieve(0xfe); err != nil { if _, err = f.Retrieve(0xfe); err != nil {
f.Close() f.Close()
@ -302,11 +302,11 @@ func assertFileSize(f string, size int64) error {
// the index is repaired // the index is repaired
func TestFreezerRepairDanglingIndex(t *testing.T) { func TestFreezerRepairDanglingIndex(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
{ // Fill a table and close it { // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -342,7 +342,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
// 45, 45, 15 // 45, 45, 15
// with 3+3+1 items // with 3+3+1 items
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -359,11 +359,11 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
func TestFreezerTruncate(t *testing.T) { func TestFreezerTruncate(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncation-%d", rand.Uint64()) fname := fmt.Sprintf("truncation-%d", rand.Uint64())
{ // Fill table { // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -380,7 +380,7 @@ func TestFreezerTruncate(t *testing.T) {
} }
// Reopen, truncate // Reopen, truncate
{ {
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -402,10 +402,10 @@ func TestFreezerTruncate(t *testing.T) {
// That will rewind the index, and _should_ truncate the head file // That will rewind the index, and _should_ truncate the head file
func TestFreezerRepairFirstFile(t *testing.T) { func TestFreezerRepairFirstFile(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
{ // Fill table { // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -433,7 +433,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
} }
// Reopen // Reopen
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -458,10 +458,10 @@ func TestFreezerRepairFirstFile(t *testing.T) {
// - check that we did not keep the rdonly file descriptors // - check that we did not keep the rdonly file descriptors
func TestFreezerReadAndTruncate(t *testing.T) { func TestFreezerReadAndTruncate(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
{ // Fill table { // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -478,7 +478,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
} }
// Reopen and read all files // Reopen and read all files
{ {
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -504,10 +504,10 @@ func TestFreezerReadAndTruncate(t *testing.T) {
func TestOffset(t *testing.T) { func TestOffset(t *testing.T) {
t.Parallel() t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter() rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("offset-%d", rand.Uint64()) fname := fmt.Sprintf("offset-%d", rand.Uint64())
{ // Fill table { // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -563,7 +563,7 @@ func TestOffset(t *testing.T) {
} }
// Now open again // Now open again
{ {
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -418,9 +418,9 @@ func (l *txPricedList) Put(tx *types.Transaction) {
// Removed notifies the prices transaction list that an old transaction dropped // Removed notifies the prices transaction list that an old transaction dropped
// from the pool. The list will just keep a counter of stale objects and update // from the pool. The list will just keep a counter of stale objects and update
// the heap if a large enough ratio of transactions go stale. // the heap if a large enough ratio of transactions go stale.
func (l *txPricedList) Removed() { func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%) // Bump the stale counter, but exit if still too low (< 25%)
l.stales++ l.stales += count
if l.stales <= len(*l.items)/4 { if l.stales <= len(*l.items)/4 {
return return
} }

@ -85,20 +85,25 @@ var (
var ( var (
// Metrics for the pending pool // Metrics for the pending pool
pendingDiscardCounter = metrics.NewRegisteredCounter("txpool/pending/discard", nil) pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
pendingReplaceCounter = metrics.NewRegisteredCounter("txpool/pending/replace", nil) pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
pendingRateLimitCounter = metrics.NewRegisteredCounter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
pendingNofundsCounter = metrics.NewRegisteredCounter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
// Metrics for the queued pool // Metrics for the queued pool
queuedDiscardCounter = metrics.NewRegisteredCounter("txpool/queued/discard", nil) queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
queuedReplaceCounter = metrics.NewRegisteredCounter("txpool/queued/replace", nil) queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
queuedRateLimitCounter = metrics.NewRegisteredCounter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
queuedNofundsCounter = metrics.NewRegisteredCounter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
// General tx metrics // General tx metrics
invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil) validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil) invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
) )
// TxStatus is the current status of a transaction as seen by the pool. // TxStatus is the current status of a transaction as seen by the pool.
@ -661,7 +666,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction fails basic validation, discard it // If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil { if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err) log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1) invalidTxMeter.Mark(1)
return false, err return false, err
} }
// If the transaction pool is full, discard underpriced transactions // If the transaction pool is full, discard underpriced transactions
@ -669,14 +674,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the new transaction is underpriced, don't accept it // If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) { if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1) underpricedTxMeter.Mark(1)
return false, ErrUnderpriced return false, ErrUnderpriced
} }
// New transaction is better than our worse ones, make room for it // New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop { for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1) underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false) pool.removeTx(tx.Hash(), false)
} }
} }
@ -686,14 +691,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// Nonce already pending, check if required price bump is met // Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump) inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted { if !inserted {
pendingDiscardCounter.Inc(1) pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced return false, ErrReplaceUnderpriced
} }
// New transaction is better, replace old one // New transaction is better, replace old one
if old != nil { if old != nil {
pool.all.Remove(old.Hash()) pool.all.Remove(old.Hash())
pool.priced.Removed() pool.priced.Removed(1)
pendingReplaceCounter.Inc(1) pendingReplaceMeter.Mark(1)
} }
pool.all.Add(tx) pool.all.Add(tx)
pool.priced.Put(tx) pool.priced.Put(tx)
@ -718,6 +723,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.locals.add(from) pool.locals.add(from)
} }
} }
if local || pool.locals.contains(from) {
localCounter.Inc(1)
}
pool.journalTx(from, tx) pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
@ -736,14 +744,17 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted { if !inserted {
// An older transaction was better, discard this // An older transaction was better, discard this
queuedDiscardCounter.Inc(1) queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced return false, ErrReplaceUnderpriced
} }
// Discard any previous transaction and mark this // Discard any previous transaction and mark this
if old != nil { if old != nil {
pool.all.Remove(old.Hash()) pool.all.Remove(old.Hash())
pool.priced.Removed() pool.priced.Removed(1)
queuedReplaceCounter.Inc(1) queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedCounter.Inc(1)
} }
if pool.all.Get(hash) == nil { if pool.all.Get(hash) == nil {
pool.all.Add(tx) pool.all.Add(tx)
@ -779,17 +790,20 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
if !inserted { if !inserted {
// An older transaction was better, discard this // An older transaction was better, discard this
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() pool.priced.Removed(1)
pendingDiscardCounter.Inc(1) pendingDiscardMeter.Mark(1)
return false return false
} }
// Otherwise discard any previous transaction and mark this // Otherwise discard any previous transaction and mark this
if old != nil { if old != nil {
pool.all.Remove(old.Hash()) pool.all.Remove(old.Hash())
pool.priced.Removed() pool.priced.Removed(1)
pendingReplaceCounter.Inc(1) pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingCounter.Inc(1)
} }
// Failsafe to work around direct pending inserts (tests) // Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil { if pool.all.Get(hash) == nil {
@ -844,6 +858,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
if err != nil { if err != nil {
return err return err
} }
validMeter.Mark(1)
// If we added a new transaction, run promotion checks and return // If we added a new transaction, run promotion checks and return
if !replace { if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated from, _ := types.Sender(pool.signer, tx) // already validated
@ -878,6 +894,8 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
dirty[from] = struct{}{} dirty[from] = struct{}{}
} }
} }
validMeter.Mark(int64(len(dirty)))
// Only reprocess the internal state if something was actually added // Only reprocess the internal state if something was actually added
if len(dirty) > 0 { if len(dirty) > 0 {
addrs := make([]common.Address, 0, len(dirty)) addrs := make([]common.Address, 0, len(dirty))
@ -928,7 +946,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Remove it from the list of known transactions // Remove it from the list of known transactions
pool.all.Remove(hash) pool.all.Remove(hash)
if outofbound { if outofbound {
pool.priced.Removed() pool.priced.Removed(1)
}
if pool.locals.contains(addr) {
localCounter.Dec(1)
} }
// Remove the transaction from the pending lists and reset the account nonce // Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil { if pending := pool.pending[addr]; pending != nil {
@ -946,12 +967,17 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce) pool.pendingState.SetNonce(addr, nonce)
} }
// Reduce the pending counter
pendingCounter.Dec(int64(1 + len(invalids)))
return return
} }
} }
// Transaction is in the future queue // Transaction is in the future queue
if future := pool.queue[addr]; future != nil { if future := pool.queue[addr]; future != nil {
future.Remove(tx) if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedCounter.Dec(1)
}
if future.Empty() { if future.Empty() {
delete(pool.queue, addr) delete(pool.queue, addr)
} }
@ -979,38 +1005,48 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
continue // Just in case someone calls with a non existing account continue // Just in case someone calls with a non existing account
} }
// Drop all transactions that are deemed too old (low nonce) // Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() log.Trace("Removed old queued transaction", "hash", hash)
} }
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops { for _, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() log.Trace("Removed unpayable queued transaction", "hash", hash)
queuedNofundsCounter.Inc(1)
} }
queuedNofundsMeter.Mark(int64(len(drops)))
// Gather all executable transactions and promote them // Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { readies := list.Ready(pool.pendingState.GetNonce(addr))
for _, tx := range readies {
hash := tx.Hash() hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) { if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash) log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx) promoted = append(promoted, tx)
} }
} }
queuedCounter.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit // Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) { if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) { caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash) log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
} }
queuedRateLimitMeter.Mark(int64(len(caps)))
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
} }
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if list.Empty() { if list.Empty() {
@ -1052,11 +1088,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ { for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]] list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
caps := list.Cap(list.Len() - 1)
for _, tx := range caps {
// Drop the transaction from the global pools too // Drop the transaction from the global pools too
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction // Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
@ -1064,6 +1101,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
} }
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
} }
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localCounter.Dec(int64(len(caps)))
}
pending-- pending--
} }
} }
@ -1074,11 +1116,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders { for _, addr := range offenders {
list := pool.pending[addr] list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
caps := list.Cap(list.Len() - 1)
for _, tx := range caps {
// Drop the transaction from the global pools too // Drop the transaction from the global pools too
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction // Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
@ -1086,11 +1129,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
} }
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
} }
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(caps)))
}
pending-- pending--
} }
} }
} }
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
} }
// If we've queued more transactions than the hard limit, drop oldest ones // If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0) queued := uint64(0)
@ -1120,7 +1168,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash(), true) pool.removeTx(tx.Hash(), true)
} }
drop -= size drop -= size
queuedRateLimitCounter.Inc(int64(size)) queuedRateLimitMeter.Mark(int64(size))
continue continue
} }
// Otherwise drop only last few transactions // Otherwise drop only last few transactions
@ -1128,7 +1176,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for i := len(txs) - 1; i >= 0 && drop > 0; i-- { for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true) pool.removeTx(txs[i].Hash(), true)
drop-- drop--
queuedRateLimitCounter.Inc(1) queuedRateLimitMeter.Mark(1)
} }
} }
} }
@ -1143,11 +1191,11 @@ func (pool *TxPool) demoteUnexecutables() {
nonce := pool.currentState.GetNonce(addr) nonce := pool.currentState.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce) // Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) { olds := list.Forward(nonce)
for _, tx := range olds {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed old pending transaction", "hash", hash)
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed() log.Trace("Removed old pending transaction", "hash", hash)
} }
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
@ -1155,21 +1203,28 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash) log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash) pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
} }
pool.priced.Removed(len(olds) + len(drops))
pendingNofundsMeter.Mark(int64(len(drops)))
for _, tx := range invalids { for _, tx := range invalids {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash) log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx) pool.enqueueTx(hash, tx)
} }
pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions // If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil { if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) { gapped := list.Cap(0)
for _, tx := range gapped {
hash := tx.Hash() hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash) log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx) pool.enqueueTx(hash, tx)
} }
pendingCounter.Inc(int64(len(gapped)))
} }
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if list.Empty() { if list.Empty() {

@ -442,9 +442,7 @@ func (s *stateSync) process(req *stateReq) (int, error) {
default: default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
} }
if _, ok := req.tasks[hash]; ok { delete(req.tasks, hash)
delete(req.tasks, hash)
}
} }
// Put unfulfilled tasks back into the retry queue // Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len() npeers := s.d.peers.Len()

@ -67,6 +67,7 @@ type Database struct {
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
@ -112,6 +113,7 @@ func New(file string, cache int, handles int, namespace string) (*Database, erro
ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil) ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil) ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil) ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
ldb.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil) ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil) ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil) ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
@ -233,7 +235,7 @@ func (db *Database) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values // Create the counters to store current and previous compaction values
compactions := make([][]float64, 2) compactions := make([][]float64, 2)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
compactions[i] = make([]float64, 3) compactions[i] = make([]float64, 4)
} }
// Create storage for iostats. // Create storage for iostats.
var iostats [2]float64 var iostats [2]float64
@ -279,7 +281,7 @@ func (db *Database) meter(refresh time.Duration) {
if len(parts) != 6 { if len(parts) != 6 {
break break
} }
for idx, counter := range parts[3:] { for idx, counter := range parts[2:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil { if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err) db.log.Error("Compaction entry parsing failed", "err", err)
@ -290,16 +292,18 @@ func (db *Database) meter(refresh time.Duration) {
} }
} }
// Update all the requested meters // Update all the requested meters
if db.diskSizeGauge != nil {
db.diskSizeGauge.Update(int64(compactions[i%2][0] * 1024 * 1024))
}
if db.compTimeMeter != nil { if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) db.compTimeMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1000 * 1000 * 1000))
} }
if db.compReadMeter != nil { if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) db.compReadMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
} }
if db.compWriteMeter != nil { if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) db.compWriteMeter.Mark(int64((compactions[i%2][3] - compactions[(i-1)%2][3]) * 1024 * 1024))
} }
// Retrieve the write delay statistic // Retrieve the write delay statistic
writedelay, err := db.db.GetProperty("leveldb.writedelay") writedelay, err := db.db.GetProperty("leveldb.writedelay")
if err != nil { if err != nil {

36
metrics/cpu.go Normal file

@ -0,0 +1,36 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package metrics
import "github.com/elastic/gosigar"
// CPUStats is the system and process CPU stats.
type CPUStats struct {
GlobalTime int64 // Time spent by the CPU working on all processes
GlobalWait int64 // Time spent by waiting on disk for all processes
LocalTime int64 // Time spent by the CPU working on this process
}
// ReadCPUStats retrieves the current CPU stats.
func ReadCPUStats(stats *CPUStats) {
global := gosigar.Cpu{}
global.Get()
stats.GlobalTime = int64(global.User + global.Nice + global.Sys)
stats.GlobalWait = int64(global.Wait)
stats.LocalTime = getProcessCPUTime()
}

35
metrics/cpu_syscall.go Normal file

@ -0,0 +1,35 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build !windows
package metrics
import (
"syscall"
"github.com/ethereum/go-ethereum/log"
)
// getProcessCPUTime retrieves the process' CPU time since program startup.
func getProcessCPUTime() int64 {
var usage syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil {
log.Warn("Failed to retrieve CPU time", "err", err)
return 0
}
return int64(usage.Utime.Sec+usage.Stime.Sec)*100 + int64(usage.Utime.Usec+usage.Stime.Usec)/10000 //nolint:unconvert
}

23
metrics/cpu_windows.go Normal file

@ -0,0 +1,23 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package metrics
// getProcessCPUTime returns 0 on Windows as there is no system call to resolve
// the actual process' CPU time.
func getProcessCPUTime() int64 {
return 0
}

@ -61,18 +61,27 @@ func CollectProcessMetrics(refresh time.Duration) {
if !Enabled { if !Enabled {
return return
} }
refreshFreq := int64(refresh / time.Second)
// Create the various data collectors // Create the various data collectors
cpuStats := make([]*CPUStats, 2)
memstats := make([]*runtime.MemStats, 2) memstats := make([]*runtime.MemStats, 2)
diskstats := make([]*DiskStats, 2) diskstats := make([]*DiskStats, 2)
for i := 0; i < len(memstats); i++ { for i := 0; i < len(memstats); i++ {
cpuStats[i] = new(CPUStats)
memstats[i] = new(runtime.MemStats) memstats[i] = new(runtime.MemStats)
diskstats[i] = new(DiskStats) diskstats[i] = new(DiskStats)
} }
// Define the various metrics to collect // Define the various metrics to collect
cpuSysLoad := GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry)
cpuSysWait := GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry)
cpuProcLoad := GetOrRegisterGauge("system/cpu/procload", DefaultRegistry)
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry)
memAllocs := GetOrRegisterMeter("system/memory/allocs", DefaultRegistry) memAllocs := GetOrRegisterMeter("system/memory/allocs", DefaultRegistry)
memFrees := GetOrRegisterMeter("system/memory/frees", DefaultRegistry) memFrees := GetOrRegisterMeter("system/memory/frees", DefaultRegistry)
memInuse := GetOrRegisterMeter("system/memory/inuse", DefaultRegistry) memHeld := GetOrRegisterGauge("system/memory/held", DefaultRegistry)
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry) memUsed := GetOrRegisterGauge("system/memory/used", DefaultRegistry)
var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter
var diskReadBytesCounter, diskWriteBytesCounter Counter var diskReadBytesCounter, diskWriteBytesCounter Counter
@ -91,11 +100,17 @@ func CollectProcessMetrics(refresh time.Duration) {
location1 := i % 2 location1 := i % 2
location2 := (i - 1) % 2 location2 := (i - 1) % 2
ReadCPUStats(cpuStats[location1])
cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq)
cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq)
cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq)
runtime.ReadMemStats(memstats[location1]) runtime.ReadMemStats(memstats[location1])
memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs))
memAllocs.Mark(int64(memstats[location1].Mallocs - memstats[location2].Mallocs)) memAllocs.Mark(int64(memstats[location1].Mallocs - memstats[location2].Mallocs))
memFrees.Mark(int64(memstats[location1].Frees - memstats[location2].Frees)) memFrees.Mark(int64(memstats[location1].Frees - memstats[location2].Frees))
memInuse.Mark(int64(memstats[location1].Alloc - memstats[location2].Alloc)) memHeld.Update(int64(memstats[location1].HeapSys - memstats[location1].HeapReleased))
memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs)) memUsed.Update(int64(memstats[location1].Alloc))
if ReadDiskStats(diskstats[location1]) == nil { if ReadDiskStats(diskstats[location1]) == nil {
diskReads.Mark(diskstats[location1].ReadCount - diskstats[location2].ReadCount) diskReads.Mark(diskstats[location1].ReadCount - diskstats[location2].ReadCount)

@ -25,18 +25,17 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
) )
const ( const (
MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
MeteredPeerLimit = 1024 // This amount of peers are individually metered MeteredPeerLimit = 1024 // This amount of peers are individually metered
) )
@ -46,6 +45,7 @@ var (
ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
activePeerCounter = metrics.NewRegisteredCounter("p2p/peers", nil) // Gauge tracking the current peer count
PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
@ -124,6 +124,8 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
} else { } else {
egressConnectMeter.Mark(1) egressConnectMeter.Mark(1)
} }
activePeerCounter.Inc(1)
return &meteredConn{ return &meteredConn{
Conn: conn, Conn: conn,
ip: ip, ip: ip,
@ -198,6 +200,7 @@ func (c *meteredConn) Close() error {
IP: c.ip, IP: c.ip,
Elapsed: time.Since(c.connected), Elapsed: time.Since(c.connected),
}) })
activePeerCounter.Dec(1)
return err return err
} }
id := c.id id := c.id
@ -209,6 +212,7 @@ func (c *meteredConn) Close() error {
IP: c.ip, IP: c.ip,
ID: id, ID: id,
}) })
activePeerCounter.Dec(1)
return err return err
} }
ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()) ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
@ -229,5 +233,6 @@ func (c *meteredConn) Close() error {
Ingress: ingress, Ingress: ingress,
Egress: egress, Egress: egress,
}) })
activePeerCounter.Dec(1)
return err return err
} }

@ -685,9 +685,8 @@ running:
// This channel is used by RemoveTrustedPeer to remove an enode // This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set. // from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n) srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID()]; ok { delete(trusted, n.ID())
delete(trusted, n.ID())
}
// Unmark any already-connected peer as trusted // Unmark any already-connected peer as trusted
if p, ok := peers[n.ID()]; ok { if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false) p.rw.set(trustedConn, false)