eth/protocols, prp/tracker: add support for req/rep rtt tracking (#22608)

* eth/protocols, prp/tracker: add support for req/rep rtt tracking

* p2p/tracker: sanity cap the number of pending requests

* pap/tracker: linter <3

* p2p/tracker: disable entire tracker if no metrics are enabled
This commit is contained in:
Péter Szilágyi 2021-04-22 11:42:46 +03:00 committed by GitHub
parent 9357280fce
commit 1fb9a6dd32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 309 additions and 11 deletions

@ -223,7 +223,7 @@ func handleMessage(backend Backend, peer *Peer) error {
if peer.Version() >= ETH66 { if peer.Version() >= ETH66 {
handlers = eth66 handlers = eth66
} }
// Track the emount of time it takes to serve the request and run the handler // Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled { if metrics.Enabled {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) { defer func(start time.Time) {

@ -327,6 +327,8 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, BlockHeadersMsg, res.RequestId)
return backend.Handle(peer, &res.BlockHeadersPacket) return backend.Handle(peer, &res.BlockHeadersPacket)
} }
@ -345,6 +347,8 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, BlockBodiesMsg, res.RequestId)
return backend.Handle(peer, &res.BlockBodiesPacket) return backend.Handle(peer, &res.BlockBodiesPacket)
} }
@ -363,6 +367,8 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, NodeDataMsg, res.RequestId)
return backend.Handle(peer, &res.NodeDataPacket) return backend.Handle(peer, &res.NodeDataPacket)
} }
@ -381,6 +387,8 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, ReceiptsMsg, res.RequestId)
return backend.Handle(peer, &res.ReceiptsPacket) return backend.Handle(peer, &res.ReceiptsPacket)
} }
@ -506,5 +514,7 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
} }
peer.markTransaction(tx.Hash()) peer.markTransaction(tx.Hash())
} }
requestTracker.Fulfil(peer.id, peer.version, PooledTransactionsMsg, txs.RequestId)
return backend.Handle(peer, &txs.PooledTransactionsPacket) return backend.Handle(peer, &txs.PooledTransactionsPacket)
} }

@ -413,8 +413,11 @@ func (p *Peer) RequestOneHeader(hash common.Hash) error {
Reverse: false, Reverse: false,
} }
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &query,
}) })
} }
@ -432,8 +435,11 @@ func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, re
Reverse: reverse, Reverse: reverse,
} }
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &query,
}) })
} }
@ -451,8 +457,11 @@ func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever
Reverse: reverse, Reverse: reverse,
} }
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &query,
}) })
} }
@ -476,8 +485,11 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int,
func (p *Peer) RequestBodies(hashes []common.Hash) error { func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetBlockBodiesPacket: hashes, GetBlockBodiesPacket: hashes,
}) })
} }
@ -489,8 +501,11 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
func (p *Peer) RequestNodeData(hashes []common.Hash) error { func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes)) p.Log().Debug("Fetching batch of state data", "count", len(hashes))
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetNodeDataPacket: hashes, GetNodeDataPacket: hashes,
}) })
} }
@ -501,8 +516,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
func (p *Peer) RequestReceipts(hashes []common.Hash) error { func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetReceiptsPacket: hashes, GetReceiptsPacket: hashes,
}) })
} }
@ -513,8 +531,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
func (p *Peer) RequestTxs(hashes []common.Hash) error { func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
if p.Version() >= ETH66 { if p.Version() >= ETH66 {
id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: rand.Uint64(), RequestId: id,
GetPooledTransactionsPacket: hashes, GetPooledTransactionsPacket: hashes,
}) })
} }

@ -0,0 +1,26 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"time"
"github.com/ethereum/go-ethereum/p2p/tracker"
)
// requestTracker is a singleton tracker for eth/66 and newer request times.
var requestTracker = tracker.New(ProtocolName, 5*time.Minute)

@ -227,6 +227,8 @@ func handleMessage(backend Backend, peer *Peer) error {
return fmt.Errorf("accounts not monotonically increasing: #%d [%x] vs #%d [%x]", i-1, res.Accounts[i-1].Hash[:], i, res.Accounts[i].Hash[:]) return fmt.Errorf("accounts not monotonically increasing: #%d [%x] vs #%d [%x]", i-1, res.Accounts[i-1].Hash[:], i, res.Accounts[i].Hash[:])
} }
} }
requestTracker.Fulfil(peer.id, peer.version, AccountRangeMsg, res.ID)
return backend.Handle(peer, res) return backend.Handle(peer, res)
case msg.Code == GetStorageRangesMsg: case msg.Code == GetStorageRangesMsg:
@ -360,6 +362,8 @@ func handleMessage(backend Backend, peer *Peer) error {
} }
} }
} }
requestTracker.Fulfil(peer.id, peer.version, StorageRangesMsg, res.ID)
return backend.Handle(peer, res) return backend.Handle(peer, res)
case msg.Code == GetByteCodesMsg: case msg.Code == GetByteCodesMsg:
@ -404,6 +408,8 @@ func handleMessage(backend Backend, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, ByteCodesMsg, res.ID)
return backend.Handle(peer, res) return backend.Handle(peer, res)
case msg.Code == GetTrieNodesMsg: case msg.Code == GetTrieNodesMsg:
@ -497,6 +503,8 @@ func handleMessage(backend Backend, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
requestTracker.Fulfil(peer.id, peer.version, TrieNodesMsg, res.ID)
return backend.Handle(peer, res) return backend.Handle(peer, res)
default: default:

@ -65,6 +65,8 @@ func (p *Peer) Log() log.Logger {
// trie, starting with the origin. // trie, starting with the origin.
func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error { func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error {
p.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes)) p.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes))
requestTracker.Track(p.id, p.version, GetAccountRangeMsg, AccountRangeMsg, id)
return p2p.Send(p.rw, GetAccountRangeMsg, &GetAccountRangePacket{ return p2p.Send(p.rw, GetAccountRangeMsg, &GetAccountRangePacket{
ID: id, ID: id,
Root: root, Root: root,
@ -83,6 +85,7 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm
} else { } else {
p.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes)) p.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes))
} }
requestTracker.Track(p.id, p.version, GetStorageRangesMsg, StorageRangesMsg, id)
return p2p.Send(p.rw, GetStorageRangesMsg, &GetStorageRangesPacket{ return p2p.Send(p.rw, GetStorageRangesMsg, &GetStorageRangesPacket{
ID: id, ID: id,
Root: root, Root: root,
@ -96,6 +99,8 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm
// RequestByteCodes fetches a batch of bytecodes by hash. // RequestByteCodes fetches a batch of bytecodes by hash.
func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error { func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error {
p.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes)) p.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))
requestTracker.Track(p.id, p.version, GetByteCodesMsg, ByteCodesMsg, id)
return p2p.Send(p.rw, GetByteCodesMsg, &GetByteCodesPacket{ return p2p.Send(p.rw, GetByteCodesMsg, &GetByteCodesPacket{
ID: id, ID: id,
Hashes: hashes, Hashes: hashes,
@ -107,6 +112,8 @@ func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) e
// a specificstate trie. // a specificstate trie.
func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error { func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error {
p.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes)) p.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes))
requestTracker.Track(p.id, p.version, GetTrieNodesMsg, TrieNodesMsg, id)
return p2p.Send(p.rw, GetTrieNodesMsg, &GetTrieNodesPacket{ return p2p.Send(p.rw, GetTrieNodesMsg, &GetTrieNodesPacket{
ID: id, ID: id,
Root: root, Root: root,

@ -0,0 +1,26 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"time"
"github.com/ethereum/go-ethereum/p2p/tracker"
)
// requestTracker is a singleton tracker for request times.
var requestTracker = tracker.New(ProtocolName, time.Minute)

@ -33,9 +33,6 @@ const (
// HandleHistName is the prefix of the per-packet serving time histograms. // HandleHistName is the prefix of the per-packet serving time histograms.
HandleHistName = "p2p/handle" HandleHistName = "p2p/handle"
// WaitHistName is the prefix of the per-packet (req only) waiting time histograms.
WaitHistName = "p2p/wait"
) )
var ( var (

203
p2p/tracker/tracker.go Normal file

@ -0,0 +1,203 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tracker
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
// trackedGaugeName is the prefix of the per-packet request tracking.
trackedGaugeName = "p2p/tracked"
// lostMeterName is the prefix of the per-packet request expirations.
lostMeterName = "p2p/lost"
// staleMeterName is the prefix of the per-packet stale responses.
staleMeterName = "p2p/stale"
// waitHistName is the prefix of the per-packet (req only) waiting time histograms.
waitHistName = "p2p/wait"
// maxTrackedPackets is a huge number to act as a failsafe on the number of
// pending requests the node will track. It should never be hit unless an
// attacker figures out a way to spin requests.
maxTrackedPackets = 100000
)
// request tracks sent network requests which have not yet received a response.
type request struct {
peer string
version uint // Protocol version
reqCode uint64 // Protocol message code of the request
resCode uint64 // Protocol message code of the expected response
time time.Time // Timestamp when the request was made
expire *list.Element // Expiration marker to untrack it
}
// Tracker is a pending network request tracker to measure how much time it takes
// a remote peer to respond.
type Tracker struct {
protocol string // Protocol capability identifier for the metrics
timeout time.Duration // Global timeout after which to drop a tracked packet
pending map[uint64]*request // Currently pending requests
expire *list.List // Linked list tracking the expiration order
wake *time.Timer // Timer tracking the expiration of the next item
lock sync.Mutex // Lock protecting from concurrent updates
}
// New creates a new network request tracker to monitor how much time it takes to
// fill certain requests and how individual peers perform.
func New(protocol string, timeout time.Duration) *Tracker {
return &Tracker{
protocol: protocol,
timeout: timeout,
pending: make(map[uint64]*request),
expire: list.New(),
}
}
// Track adds a network request to the tracker to wait for a response to arrive
// or until the request it cancelled or times out.
func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If there's a duplicate request, we've just random-collided (or more probably,
// we have a bug), report it. We could also add a metric, but we're not really
// expecting ourselves to be buggy, so a noisy warning should be enough.
if _, ok := t.pending[id]; ok {
log.Error("Network request id collision", "protocol", t.protocol, "version", version, "code", reqCode, "id", id)
return
}
// If we have too many pending requests, bail out instead of leaking memory
if pending := len(t.pending); pending >= maxTrackedPackets {
log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "protocol", t.protocol, "version", version, "code", reqCode)
return
}
// Id doesn't exist yet, start tracking it
t.pending[id] = &request{
peer: peer,
version: version,
reqCode: reqCode,
resCode: resCode,
time: time.Now(),
expire: t.expire.PushBack(id),
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, version, reqCode)
metrics.GetOrRegisterGauge(g, nil).Inc(1)
// If we've just inserted the first item, start the expiration timer
if t.wake == nil {
t.wake = time.AfterFunc(t.timeout, t.clean)
}
}
// clean is called automatically when a preset time passes without a response
// being dleivered for the first network request.
func (t *Tracker) clean() {
t.lock.Lock()
defer t.lock.Unlock()
// Expire anything within a certain threshold (might be no items at all if
// we raced with the delivery)
for t.expire.Len() > 0 {
// Stop iterating if the next pending request is still alive
var (
head = t.expire.Front()
id = head.Value.(uint64)
req = t.pending[id]
)
if time.Since(req.time) < t.timeout+5*time.Millisecond {
break
}
// Nope, dead, drop it
t.expire.Remove(head)
delete(t.pending, id)
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
m := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
}
t.schedule()
}
// schedule starts a timer to trigger on the expiration of the first network
// packet.
func (t *Tracker) schedule() {
if t.expire.Len() == 0 {
t.wake = nil
return
}
t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
}
// Fulfil fills a pending request, if any is available, reporting on various metrics.
func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If it's a non existing request, track as stale response
req, ok := t.pending[id]
if !ok {
m := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.protocol, version, code)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
return
}
// If the response is funky, it might be some active attack
if req.peer != peer || req.version != version || req.resCode != code {
log.Warn("Network response id collision",
"have", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, version, code),
"want", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, req.version, req.resCode),
)
return
}
// Everything matches, mark the request serviced and meter it
t.expire.Remove(req.expire)
if req.expire.Prev() == nil {
t.wake.Stop()
t.schedule()
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode)
sampler := func() metrics.Sample {
return metrics.ResettingSample(
metrics.NewExpDecaySample(1028, 0.015),
)
}
metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds())
}