Merge pull request #20133 from karalabe/measure-subprotocol-traffic

p2p: measure subprotocol bandwidth usage
This commit is contained in:
Péter Szilágyi 2019-09-30 12:02:29 +03:00 committed by GitHub
commit 62b43ee0d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 1 deletions

@ -39,9 +39,13 @@ import (
// separate Msg with a bytes.Reader as Payload for each send. // separate Msg with a bytes.Reader as Payload for each send.
type Msg struct { type Msg struct {
Code uint64 Code uint64
Size uint32 // size of the paylod Size uint32 // Size of the raw payload
Payload io.Reader Payload io.Reader
ReceivedAt time.Time ReceivedAt time.Time
meterCap Cap // Protocol name and version for egress metering
meterCode uint64 // Message within protocol for egress metering
meterSize uint32 // Compressed message size for ingress metering
} }
// Decode parses the RLP content of a message into // Decode parses the RLP content of a message into

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -300,6 +301,9 @@ func (p *Peer) handle(msg Msg) error {
if err != nil { if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code) return fmt.Errorf("msg code out of range: %v", msg.Code)
} }
if metrics.Enabled {
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize))
}
select { select {
case proto.in <- msg: case proto.in <- msg:
return nil return nil
@ -398,7 +402,11 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length { if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled") return newPeerError(errInvalidMsgCode, "not handled")
} }
msg.meterCap = rw.cap()
msg.meterCode = msg.Code
msg.Code += rw.offset msg.Code += rw.offset
select { select {
case <-rw.wstart: case <-rw.wstart:
err = rw.w.WriteMsg(msg) err = rw.w.WriteMsg(msg)

@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/golang/snappy" "github.com/golang/snappy"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
@ -602,6 +603,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
msg.Payload = bytes.NewReader(payload) msg.Payload = bytes.NewReader(payload)
msg.Size = uint32(len(payload)) msg.Size = uint32(len(payload))
} }
msg.meterSize = msg.Size
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize))
}
// write header // write header
headbuf := make([]byte, 32) headbuf := make([]byte, 32)
fsize := uint32(len(ptype)) + msg.Size fsize := uint32(len(ptype)) + msg.Size
@ -686,6 +691,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
return msg, err return msg, err
} }
msg.Size = uint32(content.Len()) msg.Size = uint32(content.Len())
msg.meterSize = msg.Size
msg.Payload = content msg.Payload = content
// if snappy is enabled, verify and decompress message // if snappy is enabled, verify and decompress message