whisper/whisperv6: PoW requirement (#15701)
New Whisper-level message introduced (PoW requirement), corresponding logic added, plus some tests.
This commit is contained in:
parent
b0d41e386e
commit
38b1e8ee20
@ -40,10 +40,13 @@ const (
|
||||
ProtocolVersionStr = "6.0"
|
||||
ProtocolName = "shh"
|
||||
|
||||
// whisper protocol message codes, according to EIP-627
|
||||
statusCode = 0 // used by whisper protocol
|
||||
messagesCode = 1 // normal whisper message
|
||||
p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
|
||||
p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol
|
||||
powRequirementCode = 2 // PoW requirement
|
||||
bloomFilterExCode = 3 // bloom filter exchange
|
||||
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
|
||||
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
|
||||
NumberOfMessageCodes = 128
|
||||
|
||||
paddingMask = byte(3)
|
||||
|
@ -18,6 +18,7 @@ package whisperv6
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -32,7 +33,9 @@ type Peer struct {
|
||||
host *Whisper
|
||||
peer *p2p.Peer
|
||||
ws p2p.MsgReadWriter
|
||||
|
||||
trusted bool
|
||||
powRequirement float64
|
||||
|
||||
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
|
||||
|
||||
@ -46,6 +49,7 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
|
||||
peer: remote,
|
||||
ws: rw,
|
||||
trusted: false,
|
||||
powRequirement: 0.0,
|
||||
known: set.New(),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
@ -152,7 +156,7 @@ func (p *Peer) broadcast() error {
|
||||
envelopes := p.host.Envelopes()
|
||||
bundle := make([]*Envelope, 0, len(envelopes))
|
||||
for _, envelope := range envelopes {
|
||||
if !p.marked(envelope) {
|
||||
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
|
||||
bundle = append(bundle, envelope)
|
||||
}
|
||||
}
|
||||
@ -177,3 +181,8 @@ func (p *Peer) ID() []byte {
|
||||
id := p.peer.ID()
|
||||
return id[:]
|
||||
}
|
||||
|
||||
func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
|
||||
i := math.Float64bits(pow)
|
||||
return p2p.Send(p.ws, powRequirementCode, i)
|
||||
}
|
||||
|
@ -88,21 +88,31 @@ var sharedKey []byte = []byte("some arbitrary data here")
|
||||
var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
|
||||
var expectedMessage []byte = []byte("per rectum ad astra")
|
||||
|
||||
// This test does the following:
|
||||
// 1. creates a chain of whisper nodes,
|
||||
// 2. installs the filters with shared (predefined) parameters,
|
||||
// 3. each node sends a number of random (undecryptable) messages,
|
||||
// 4. first node sends one expected (decryptable) message,
|
||||
// 5. checks if each node have received and decrypted exactly one message.
|
||||
func TestSimulation(t *testing.T) {
|
||||
// create a chain of whisper nodes,
|
||||
// installs the filters with shared (predefined) parameters
|
||||
initialize(t)
|
||||
|
||||
// each node sends a number of random (undecryptable) messages
|
||||
for i := 0; i < NumNodes; i++ {
|
||||
sendMsg(t, false, i)
|
||||
}
|
||||
|
||||
// node #0 sends one expected (decryptable) message
|
||||
sendMsg(t, true, 0)
|
||||
checkPropagation(t)
|
||||
|
||||
// check if each node have received and decrypted exactly one message
|
||||
checkPropagation(t, true)
|
||||
|
||||
// send protocol-level messages (powRequirementCode) and check the new PoW requirement values
|
||||
powReqExchange(t)
|
||||
|
||||
// node #1 sends one expected (decryptable) message
|
||||
sendMsg(t, true, 1)
|
||||
|
||||
// check if each node (except node #0) have received and decrypted exactly one message
|
||||
checkPropagation(t, false)
|
||||
|
||||
stopServers()
|
||||
}
|
||||
|
||||
@ -114,7 +124,7 @@ func initialize(t *testing.T) {
|
||||
for i := 0; i < NumNodes; i++ {
|
||||
var node TestNode
|
||||
node.shh = New(&DefaultConfig)
|
||||
node.shh.SetMinimumPoW(0.00000001)
|
||||
node.shh.SetMinimumPowTest(0.00000001)
|
||||
node.shh.Start(nil)
|
||||
topics := make([]TopicType, 0)
|
||||
topics = append(topics, sharedTopic)
|
||||
@ -154,12 +164,17 @@ func initialize(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err = node.server.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start server %d.", i)
|
||||
nodes[i] = &node
|
||||
}
|
||||
|
||||
nodes[i] = &node
|
||||
for i := 1; i < NumNodes; i++ {
|
||||
go nodes[i].server.Start()
|
||||
}
|
||||
|
||||
// we need to wait until the first node actually starts
|
||||
err = nodes[0].server.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start the fisrt server.")
|
||||
}
|
||||
}
|
||||
|
||||
@ -174,18 +189,21 @@ func stopServers() {
|
||||
}
|
||||
}
|
||||
|
||||
func checkPropagation(t *testing.T) {
|
||||
func checkPropagation(t *testing.T, includingNodeZero bool) {
|
||||
if t.Failed() {
|
||||
return
|
||||
}
|
||||
|
||||
const cycle = 100
|
||||
const iterations = 100
|
||||
const cycle = 50
|
||||
const iterations = 200
|
||||
|
||||
first := 0
|
||||
if !includingNodeZero {
|
||||
first = 1
|
||||
}
|
||||
|
||||
for j := 0; j < iterations; j++ {
|
||||
time.Sleep(cycle * time.Millisecond)
|
||||
|
||||
for i := 0; i < NumNodes; i++ {
|
||||
for i := first; i < NumNodes; i++ {
|
||||
f := nodes[i].shh.GetFilter(nodes[i].filerId)
|
||||
if f == nil {
|
||||
t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
|
||||
@ -200,9 +218,18 @@ func checkPropagation(t *testing.T) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(cycle * time.Millisecond)
|
||||
}
|
||||
|
||||
t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)
|
||||
|
||||
if !includingNodeZero {
|
||||
f := nodes[0].shh.GetFilter(nodes[0].filerId)
|
||||
if f != nil {
|
||||
t.Fatalf("node zero received a message with low PoW.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
|
||||
@ -304,3 +331,35 @@ func TestPeerBasic(t *testing.T) {
|
||||
t.Fatalf("failed mark with seed %d.", seed)
|
||||
}
|
||||
}
|
||||
|
||||
func powReqExchange(t *testing.T) {
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if peer.powRequirement > 1000.0 {
|
||||
t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const pow float64 = 7777777.0
|
||||
nodes[0].shh.SetMinimumPoW(pow)
|
||||
|
||||
// wait until all the messages are delivered
|
||||
time.Sleep(64 * time.Millisecond)
|
||||
|
||||
cnt := 0
|
||||
for i, node := range nodes {
|
||||
for peer := range node.shh.peers {
|
||||
if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
|
||||
cnt++
|
||||
if peer.powRequirement != pow {
|
||||
t.Fatalf("node %d: failed to set the new pow requirement.", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cnt == 0 {
|
||||
t.Fatalf("no matching peers found.")
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
crand "crypto/rand"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
@ -30,6 +31,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
@ -74,6 +76,8 @@ type Whisper struct {
|
||||
|
||||
settings syncmap.Map // holds configuration settings that can be dynamically changed
|
||||
|
||||
reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
|
||||
|
||||
statsMu sync.Mutex // guard stats
|
||||
stats Statistics // Statistics of whisper node
|
||||
|
||||
@ -95,6 +99,7 @@ func New(cfg *Config) *Whisper {
|
||||
messageQueue: make(chan *Envelope, messageQueueLimit),
|
||||
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
|
||||
quit: make(chan struct{}),
|
||||
reactionAllowance: SynchAllowance,
|
||||
}
|
||||
|
||||
whisper.filters = NewFilters(whisper)
|
||||
@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
|
||||
|
||||
// SetMinimumPoW sets the minimal PoW required by this node
|
||||
func (w *Whisper) SetMinimumPoW(val float64) error {
|
||||
if val <= 0.0 {
|
||||
if val < 0.0 {
|
||||
return fmt.Errorf("invalid PoW: %f", val)
|
||||
}
|
||||
|
||||
w.notifyPeersAboutPowRequirementChange(val)
|
||||
|
||||
go func() {
|
||||
// allow some time before all the peers have processed the notification
|
||||
time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
|
||||
w.settings.Store(minPowIdx, val)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetMinimumPoW sets the minimal PoW in test environment
|
||||
func (w *Whisper) SetMinimumPowTest(val float64) {
|
||||
w.notifyPeersAboutPowRequirementChange(val)
|
||||
w.settings.Store(minPowIdx, val)
|
||||
}
|
||||
|
||||
func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
|
||||
arr := make([]*Peer, len(w.peers))
|
||||
i := 0
|
||||
|
||||
w.peerMu.Lock()
|
||||
for p := range w.peers {
|
||||
arr[i] = p
|
||||
i++
|
||||
}
|
||||
w.peerMu.Unlock()
|
||||
|
||||
for _, p := range arr {
|
||||
err := p.notifyAboutPowRequirementChange(pow)
|
||||
if err != nil {
|
||||
// allow one retry
|
||||
err = p.notifyAboutPowRequirementChange(pow)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("oversized message received", "peer", p.ID(), "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getPeer retrieves peer by ID
|
||||
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
|
||||
w.peerMu.Lock()
|
||||
@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
|
||||
|
||||
// SendP2PDirect sends a peer-to-peer message to a specific peer.
|
||||
func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
|
||||
return p2p.Send(peer.ws, p2pCode, envelope)
|
||||
return p2p.Send(peer.ws, p2pMessageCode, envelope)
|
||||
}
|
||||
|
||||
// NewKeyPair generates a new cryptographic identity for the client, and injects
|
||||
@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||
if trouble {
|
||||
return errors.New("invalid envelope")
|
||||
}
|
||||
case p2pCode:
|
||||
case powRequirementCode:
|
||||
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
|
||||
i, err := s.Uint()
|
||||
if err != nil {
|
||||
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||
return errors.New("invalid powRequirementCode message")
|
||||
}
|
||||
f := math.Float64frombits(i)
|
||||
if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
|
||||
log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||
return errors.New("invalid value in powRequirementCode message")
|
||||
}
|
||||
p.powRequirement = f
|
||||
case bloomFilterExCode:
|
||||
// to be implemented
|
||||
case p2pMessageCode:
|
||||
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
|
||||
// this message is not supposed to be forwarded to other peers, and
|
||||
// therefore might not satisfy the PoW, expiry and other requirements.
|
||||
@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
|
||||
|
||||
if envelope.PoW() < wh.MinPow() {
|
||||
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
|
||||
return false, nil // drop envelope without error
|
||||
return false, nil // drop envelope without error for now
|
||||
|
||||
// once the status message includes the PoW requirement, an error should be returned here:
|
||||
//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||
}
|
||||
|
||||
hash := envelope.Hash()
|
||||
|
@ -472,8 +472,8 @@ func TestExpiry(t *testing.T) {
|
||||
InitSingleTest()
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
w.SetMinimumPoW(0.0000001)
|
||||
defer w.SetMinimumPoW(DefaultMinimumPoW)
|
||||
w.SetMinimumPowTest(0.0000001)
|
||||
defer w.SetMinimumPowTest(DefaultMinimumPoW)
|
||||
w.Start(nil)
|
||||
defer w.Stop()
|
||||
|
||||
@ -529,7 +529,7 @@ func TestCustomization(t *testing.T) {
|
||||
InitSingleTest()
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
defer w.SetMinimumPoW(DefaultMinimumPoW)
|
||||
defer w.SetMinimumPowTest(DefaultMinimumPoW)
|
||||
defer w.SetMaxMessageSize(DefaultMaxMessageSize)
|
||||
w.Start(nil)
|
||||
defer w.Stop()
|
||||
@ -563,7 +563,7 @@ func TestCustomization(t *testing.T) {
|
||||
t.Fatalf("successfully sent envelope with PoW %.06f, false positive (seed %d).", env.PoW(), seed)
|
||||
}
|
||||
|
||||
w.SetMinimumPoW(smallPoW / 2)
|
||||
w.SetMinimumPowTest(smallPoW / 2)
|
||||
err = w.Send(env)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to send envelope with seed %d: %s.", seed, err)
|
||||
@ -625,7 +625,7 @@ func TestSymmetricSendCycle(t *testing.T) {
|
||||
InitSingleTest()
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
defer w.SetMinimumPoW(DefaultMinimumPoW)
|
||||
defer w.SetMinimumPowTest(DefaultMinimumPoW)
|
||||
defer w.SetMaxMessageSize(DefaultMaxMessageSize)
|
||||
w.Start(nil)
|
||||
defer w.Stop()
|
||||
@ -714,7 +714,7 @@ func TestSymmetricSendWithoutAKey(t *testing.T) {
|
||||
InitSingleTest()
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
defer w.SetMinimumPoW(DefaultMinimumPoW)
|
||||
defer w.SetMinimumPowTest(DefaultMinimumPoW)
|
||||
defer w.SetMaxMessageSize(DefaultMaxMessageSize)
|
||||
w.Start(nil)
|
||||
defer w.Stop()
|
||||
@ -782,7 +782,7 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
|
||||
InitSingleTest()
|
||||
|
||||
w := New(&DefaultConfig)
|
||||
defer w.SetMinimumPoW(DefaultMinimumPoW)
|
||||
defer w.SetMinimumPowTest(DefaultMinimumPoW)
|
||||
defer w.SetMaxMessageSize(DefaultMaxMessageSize)
|
||||
w.Start(nil)
|
||||
defer w.Stop()
|
||||
|
Loading…
Reference in New Issue
Block a user