Merge pull request #16223 from gluk256/300-msg-serialiation
whisper: topics replaced by bloom filters in mailserver communication
This commit is contained in:
commit
a76e46e3d7
@ -22,6 +22,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
crand "crypto/rand"
|
||||||
"crypto/sha512"
|
"crypto/sha512"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
@ -48,6 +49,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const quitCommand = "~Q"
|
const quitCommand = "~Q"
|
||||||
|
const entropySize = 32
|
||||||
|
|
||||||
// singletons
|
// singletons
|
||||||
var (
|
var (
|
||||||
@ -55,6 +57,7 @@ var (
|
|||||||
shh *whisper.Whisper
|
shh *whisper.Whisper
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
mailServer mailserver.WMailServer
|
mailServer mailserver.WMailServer
|
||||||
|
entropy [entropySize]byte
|
||||||
|
|
||||||
input = bufio.NewReader(os.Stdin)
|
input = bufio.NewReader(os.Stdin)
|
||||||
)
|
)
|
||||||
@ -274,6 +277,11 @@ func initialize() {
|
|||||||
TrustedNodes: peers,
|
TrustedNodes: peers,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, err = crand.Read(entropy[:])
|
||||||
|
if err != nil {
|
||||||
|
utils.Fatalf("crypto/rand failed: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServer() {
|
func startServer() {
|
||||||
@ -614,10 +622,10 @@ func writeMessageToFile(dir string, msg *whisper.ReceivedMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func requestExpiredMessagesLoop() {
|
func requestExpiredMessagesLoop() {
|
||||||
var key, peerID []byte
|
var key, peerID, bloom []byte
|
||||||
var timeLow, timeUpp uint32
|
var timeLow, timeUpp uint32
|
||||||
var t string
|
var t string
|
||||||
var xt, empty whisper.TopicType
|
var xt whisper.TopicType
|
||||||
|
|
||||||
keyID, err := shh.AddSymKeyFromPassword(msPassword)
|
keyID, err := shh.AddSymKeyFromPassword(msPassword)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -640,18 +648,19 @@ func requestExpiredMessagesLoop() {
|
|||||||
utils.Fatalf("Failed to parse the topic: %s", err)
|
utils.Fatalf("Failed to parse the topic: %s", err)
|
||||||
}
|
}
|
||||||
xt = whisper.BytesToTopic(x)
|
xt = whisper.BytesToTopic(x)
|
||||||
|
bloom = whisper.TopicToBloom(xt)
|
||||||
|
obfuscateBloom(bloom)
|
||||||
|
} else {
|
||||||
|
bloom = whisper.MakeFullNodeBloom()
|
||||||
}
|
}
|
||||||
if timeUpp == 0 {
|
if timeUpp == 0 {
|
||||||
timeUpp = 0xFFFFFFFF
|
timeUpp = 0xFFFFFFFF
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make([]byte, 8+whisper.TopicLength)
|
data := make([]byte, 8, 8+whisper.BloomFilterSize)
|
||||||
binary.BigEndian.PutUint32(data, timeLow)
|
binary.BigEndian.PutUint32(data, timeLow)
|
||||||
binary.BigEndian.PutUint32(data[4:], timeUpp)
|
binary.BigEndian.PutUint32(data[4:], timeUpp)
|
||||||
copy(data[8:], xt[:])
|
data = append(data, bloom...)
|
||||||
if xt == empty {
|
|
||||||
data = data[:8]
|
|
||||||
}
|
|
||||||
|
|
||||||
var params whisper.MessageParams
|
var params whisper.MessageParams
|
||||||
params.PoW = *argServerPoW
|
params.PoW = *argServerPoW
|
||||||
@ -685,3 +694,20 @@ func extractIDFromEnode(s string) []byte {
|
|||||||
}
|
}
|
||||||
return n.ID[:]
|
return n.ID[:]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// obfuscateBloom adds 16 random bits to the the bloom
|
||||||
|
// filter, in order to obfuscate the containing topics.
|
||||||
|
// it does so deterministically within every session.
|
||||||
|
// despite additional bits, it will match on average
|
||||||
|
// 32000 times less messages than full node's bloom filter.
|
||||||
|
func obfuscateBloom(bloom []byte) {
|
||||||
|
const half = entropySize / 2
|
||||||
|
for i := 0; i < half; i++ {
|
||||||
|
x := int(entropy[i])
|
||||||
|
if entropy[half+i] < 128 {
|
||||||
|
x += 256
|
||||||
|
}
|
||||||
|
|
||||||
|
bloom[x/8] = 1 << uint(x%8) // set the bit number X
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -107,17 +107,16 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, lower, upper, topic := s.validateRequest(peer.ID(), request)
|
ok, lower, upper, bloom := s.validateRequest(peer.ID(), request)
|
||||||
if ok {
|
if ok {
|
||||||
s.processRequest(peer, lower, upper, topic)
|
s.processRequest(peer, lower, upper, bloom)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, topic whisper.TopicType) []*whisper.Envelope {
|
func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte) []*whisper.Envelope {
|
||||||
ret := make([]*whisper.Envelope, 0)
|
ret := make([]*whisper.Envelope, 0)
|
||||||
var err error
|
var err error
|
||||||
var zero common.Hash
|
var zero common.Hash
|
||||||
var empty whisper.TopicType
|
|
||||||
kl := NewDbKey(lower, zero)
|
kl := NewDbKey(lower, zero)
|
||||||
ku := NewDbKey(upper, zero)
|
ku := NewDbKey(upper, zero)
|
||||||
i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
||||||
@ -130,7 +129,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, to
|
|||||||
log.Error(fmt.Sprintf("RLP decoding failed: %s", err))
|
log.Error(fmt.Sprintf("RLP decoding failed: %s", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if topic == empty || envelope.Topic == topic {
|
if whisper.BloomFilterMatch(bloom, envelope.Bloom()) {
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
// used for test purposes
|
// used for test purposes
|
||||||
ret = append(ret, &envelope)
|
ret = append(ret, &envelope)
|
||||||
@ -152,22 +151,16 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, to
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, whisper.TopicType) {
|
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte) {
|
||||||
var topic whisper.TopicType
|
|
||||||
if s.pow > 0.0 && request.PoW() < s.pow {
|
if s.pow > 0.0 && request.PoW() < s.pow {
|
||||||
return false, 0, 0, topic
|
return false, 0, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
f := whisper.Filter{KeySym: s.key}
|
f := whisper.Filter{KeySym: s.key}
|
||||||
decrypted := request.Open(&f)
|
decrypted := request.Open(&f)
|
||||||
if decrypted == nil {
|
if decrypted == nil {
|
||||||
log.Warn(fmt.Sprintf("Failed to decrypt p2p request"))
|
log.Warn(fmt.Sprintf("Failed to decrypt p2p request"))
|
||||||
return false, 0, 0, topic
|
return false, 0, 0, nil
|
||||||
}
|
|
||||||
|
|
||||||
if len(decrypted.Payload) < 8 {
|
|
||||||
log.Warn(fmt.Sprintf("Undersized p2p request"))
|
|
||||||
return false, 0, 0, topic
|
|
||||||
}
|
}
|
||||||
|
|
||||||
src := crypto.FromECDSAPub(decrypted.Src)
|
src := crypto.FromECDSAPub(decrypted.Src)
|
||||||
@ -179,15 +172,24 @@ func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope)
|
|||||||
// if !bytes.Equal(peerID, src) {
|
// if !bytes.Equal(peerID, src) {
|
||||||
if src == nil {
|
if src == nil {
|
||||||
log.Warn(fmt.Sprintf("Wrong signature of p2p request"))
|
log.Warn(fmt.Sprintf("Wrong signature of p2p request"))
|
||||||
return false, 0, 0, topic
|
return false, 0, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var bloom []byte
|
||||||
|
payloadSize := len(decrypted.Payload)
|
||||||
|
if payloadSize < 8 {
|
||||||
|
log.Warn(fmt.Sprintf("Undersized p2p request"))
|
||||||
|
return false, 0, 0, nil
|
||||||
|
} else if payloadSize == 8 {
|
||||||
|
bloom = whisper.MakeFullNodeBloom()
|
||||||
|
} else if payloadSize < 8+whisper.BloomFilterSize {
|
||||||
|
log.Warn(fmt.Sprintf("Undersized bloom filter in p2p request"))
|
||||||
|
return false, 0, 0, nil
|
||||||
|
} else {
|
||||||
|
bloom = decrypted.Payload[8 : 8+whisper.BloomFilterSize]
|
||||||
}
|
}
|
||||||
|
|
||||||
lower := binary.BigEndian.Uint32(decrypted.Payload[:4])
|
lower := binary.BigEndian.Uint32(decrypted.Payload[:4])
|
||||||
upper := binary.BigEndian.Uint32(decrypted.Payload[4:8])
|
upper := binary.BigEndian.Uint32(decrypted.Payload[4:8])
|
||||||
|
return true, lower, upper, bloom
|
||||||
if len(decrypted.Payload) >= 8+whisper.TopicLength {
|
|
||||||
topic = whisper.BytesToTopic(decrypted.Payload[8:])
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, lower, upper, topic
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package mailserver
|
package mailserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -61,7 +62,7 @@ func generateEnvelope(t *testing.T) *whisper.Envelope {
|
|||||||
h := crypto.Keccak256Hash([]byte("test sample data"))
|
h := crypto.Keccak256Hash([]byte("test sample data"))
|
||||||
params := &whisper.MessageParams{
|
params := &whisper.MessageParams{
|
||||||
KeySym: h[:],
|
KeySym: h[:],
|
||||||
Topic: whisper.TopicType{},
|
Topic: whisper.TopicType{0x1F, 0x7E, 0xA1, 0x7F},
|
||||||
Payload: []byte("test payload"),
|
Payload: []byte("test payload"),
|
||||||
PoW: powRequirement,
|
PoW: powRequirement,
|
||||||
WorkTime: 2,
|
WorkTime: 2,
|
||||||
@ -121,6 +122,7 @@ func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
|
|||||||
upp: birth + 1,
|
upp: birth + 1,
|
||||||
key: testPeerID,
|
key: testPeerID,
|
||||||
}
|
}
|
||||||
|
|
||||||
singleRequest(t, server, env, p, true)
|
singleRequest(t, server, env, p, true)
|
||||||
|
|
||||||
p.low, p.upp = birth+1, 0xffffffff
|
p.low, p.upp = birth+1, 0xffffffff
|
||||||
@ -131,14 +133,14 @@ func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
|
|||||||
|
|
||||||
p.low = birth - 1
|
p.low = birth - 1
|
||||||
p.upp = birth + 1
|
p.upp = birth + 1
|
||||||
p.topic[0]++
|
p.topic[0] = 0xFF
|
||||||
singleRequest(t, server, env, p, false)
|
singleRequest(t, server, env, p, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *ServerTestParams, expect bool) {
|
func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *ServerTestParams, expect bool) {
|
||||||
request := createRequest(t, p)
|
request := createRequest(t, p)
|
||||||
src := crypto.FromECDSAPub(&p.key.PublicKey)
|
src := crypto.FromECDSAPub(&p.key.PublicKey)
|
||||||
ok, lower, upper, topic := server.validateRequest(src, request)
|
ok, lower, upper, bloom := server.validateRequest(src, request)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("request validation failed, seed: %d.", seed)
|
t.Fatalf("request validation failed, seed: %d.", seed)
|
||||||
}
|
}
|
||||||
@ -148,12 +150,13 @@ func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *
|
|||||||
if upper != p.upp {
|
if upper != p.upp {
|
||||||
t.Fatalf("request validation failed (upper bound), seed: %d.", seed)
|
t.Fatalf("request validation failed (upper bound), seed: %d.", seed)
|
||||||
}
|
}
|
||||||
if topic != p.topic {
|
expectedBloom := whisper.TopicToBloom(p.topic)
|
||||||
|
if !bytes.Equal(bloom, expectedBloom) {
|
||||||
t.Fatalf("request validation failed (topic), seed: %d.", seed)
|
t.Fatalf("request validation failed (topic), seed: %d.", seed)
|
||||||
}
|
}
|
||||||
|
|
||||||
var exist bool
|
var exist bool
|
||||||
mail := server.processRequest(nil, p.low, p.upp, p.topic)
|
mail := server.processRequest(nil, p.low, p.upp, bloom)
|
||||||
for _, msg := range mail {
|
for _, msg := range mail {
|
||||||
if msg.Hash() == env.Hash() {
|
if msg.Hash() == env.Hash() {
|
||||||
exist = true
|
exist = true
|
||||||
@ -166,7 +169,7 @@ func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *
|
|||||||
}
|
}
|
||||||
|
|
||||||
src[0]++
|
src[0]++
|
||||||
ok, lower, upper, topic = server.validateRequest(src, request)
|
ok, lower, upper, bloom = server.validateRequest(src, request)
|
||||||
if !ok {
|
if !ok {
|
||||||
// request should be valid regardless of signature
|
// request should be valid regardless of signature
|
||||||
t.Fatalf("request validation false negative, seed: %d (lower: %d, upper: %d).", seed, lower, upper)
|
t.Fatalf("request validation false negative, seed: %d (lower: %d, upper: %d).", seed, lower, upper)
|
||||||
@ -174,10 +177,11 @@ func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createRequest(t *testing.T, p *ServerTestParams) *whisper.Envelope {
|
func createRequest(t *testing.T, p *ServerTestParams) *whisper.Envelope {
|
||||||
data := make([]byte, 8+whisper.TopicLength)
|
bloom := whisper.TopicToBloom(p.topic)
|
||||||
|
data := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint32(data, p.low)
|
binary.BigEndian.PutUint32(data, p.low)
|
||||||
binary.BigEndian.PutUint32(data[4:], p.upp)
|
binary.BigEndian.PutUint32(data[4:], p.upp)
|
||||||
copy(data[8:], p.topic[:])
|
data = append(data, bloom...)
|
||||||
|
|
||||||
key, err := shh.GetSymKey(keyID)
|
key, err := shh.GetSymKey(keyID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -60,7 +60,7 @@ const (
|
|||||||
aesKeyLength = 32 // in bytes
|
aesKeyLength = 32 // in bytes
|
||||||
aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
|
aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
|
||||||
keyIDSize = 32 // in bytes
|
keyIDSize = 32 // in bytes
|
||||||
bloomFilterSize = 64 // in bytes
|
BloomFilterSize = 64 // in bytes
|
||||||
flagsLength = 1
|
flagsLength = 1
|
||||||
|
|
||||||
EnvelopeHeaderLength = 20
|
EnvelopeHeaderLength = 20
|
||||||
|
@ -249,7 +249,7 @@ func (e *Envelope) Bloom() []byte {
|
|||||||
|
|
||||||
// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
|
// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
|
||||||
func TopicToBloom(topic TopicType) []byte {
|
func TopicToBloom(topic TopicType) []byte {
|
||||||
b := make([]byte, bloomFilterSize)
|
b := make([]byte, BloomFilterSize)
|
||||||
var index [3]int
|
var index [3]int
|
||||||
for j := 0; j < 3; j++ {
|
for j := 0; j < 3; j++ {
|
||||||
index[j] = int(topic[j])
|
index[j] = int(topic[j])
|
||||||
|
@ -56,7 +56,7 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
|
|||||||
powRequirement: 0.0,
|
powRequirement: 0.0,
|
||||||
known: set.New(),
|
known: set.New(),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
bloomFilter: makeFullNodeBloom(),
|
bloomFilter: MakeFullNodeBloom(),
|
||||||
fullNode: true,
|
fullNode: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -120,7 +120,7 @@ func (peer *Peer) handshake() error {
|
|||||||
err = s.Decode(&bloom)
|
err = s.Decode(&bloom)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sz := len(bloom)
|
sz := len(bloom)
|
||||||
if sz != bloomFilterSize && sz != 0 {
|
if sz != BloomFilterSize && sz != 0 {
|
||||||
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
|
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
|
||||||
}
|
}
|
||||||
peer.setBloomFilter(bloom)
|
peer.setBloomFilter(bloom)
|
||||||
@ -229,7 +229,7 @@ func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
|
|||||||
func (peer *Peer) bloomMatch(env *Envelope) bool {
|
func (peer *Peer) bloomMatch(env *Envelope) bool {
|
||||||
peer.bloomMu.Lock()
|
peer.bloomMu.Lock()
|
||||||
defer peer.bloomMu.Unlock()
|
defer peer.bloomMu.Unlock()
|
||||||
return peer.fullNode || bloomFilterMatch(peer.bloomFilter, env.Bloom())
|
return peer.fullNode || BloomFilterMatch(peer.bloomFilter, env.Bloom())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (peer *Peer) setBloomFilter(bloom []byte) {
|
func (peer *Peer) setBloomFilter(bloom []byte) {
|
||||||
@ -238,13 +238,13 @@ func (peer *Peer) setBloomFilter(bloom []byte) {
|
|||||||
peer.bloomFilter = bloom
|
peer.bloomFilter = bloom
|
||||||
peer.fullNode = isFullNode(bloom)
|
peer.fullNode = isFullNode(bloom)
|
||||||
if peer.fullNode && peer.bloomFilter == nil {
|
if peer.fullNode && peer.bloomFilter == nil {
|
||||||
peer.bloomFilter = makeFullNodeBloom()
|
peer.bloomFilter = MakeFullNodeBloom()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeFullNodeBloom() []byte {
|
func MakeFullNodeBloom() []byte {
|
||||||
bloom := make([]byte, bloomFilterSize)
|
bloom := make([]byte, BloomFilterSize)
|
||||||
for i := 0; i < bloomFilterSize; i++ {
|
for i := 0; i < BloomFilterSize; i++ {
|
||||||
bloom[i] = 0xFF
|
bloom[i] = 0xFF
|
||||||
}
|
}
|
||||||
return bloom
|
return bloom
|
||||||
|
@ -152,7 +152,7 @@ func resetParams(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func initBloom(t *testing.T) {
|
func initBloom(t *testing.T) {
|
||||||
masterBloomFilter = make([]byte, bloomFilterSize)
|
masterBloomFilter = make([]byte, BloomFilterSize)
|
||||||
_, err := mrand.Read(masterBloomFilter)
|
_, err := mrand.Read(masterBloomFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("rand failed: %s.", err)
|
t.Fatalf("rand failed: %s.", err)
|
||||||
@ -164,7 +164,7 @@ func initBloom(t *testing.T) {
|
|||||||
masterBloomFilter[i] = 0xFF
|
masterBloomFilter[i] = 0xFF
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bloomFilterMatch(masterBloomFilter, msgBloom) {
|
if !BloomFilterMatch(masterBloomFilter, msgBloom) {
|
||||||
t.Fatalf("bloom mismatch on initBloom.")
|
t.Fatalf("bloom mismatch on initBloom.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -178,7 +178,7 @@ func initialize(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < NumNodes; i++ {
|
for i := 0; i < NumNodes; i++ {
|
||||||
var node TestNode
|
var node TestNode
|
||||||
b := make([]byte, bloomFilterSize)
|
b := make([]byte, BloomFilterSize)
|
||||||
copy(b, masterBloomFilter)
|
copy(b, masterBloomFilter)
|
||||||
node.shh = New(&DefaultConfig)
|
node.shh = New(&DefaultConfig)
|
||||||
node.shh.SetMinimumPoW(masterPow)
|
node.shh.SetMinimumPoW(masterPow)
|
||||||
|
@ -232,11 +232,11 @@ func (whisper *Whisper) SetMaxMessageSize(size uint32) error {
|
|||||||
|
|
||||||
// SetBloomFilter sets the new bloom filter
|
// SetBloomFilter sets the new bloom filter
|
||||||
func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
|
func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
|
||||||
if len(bloom) != bloomFilterSize {
|
if len(bloom) != BloomFilterSize {
|
||||||
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
|
return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, bloomFilterSize)
|
b := make([]byte, BloomFilterSize)
|
||||||
copy(b, bloom)
|
copy(b, bloom)
|
||||||
|
|
||||||
whisper.settings.Store(bloomFilterIdx, b)
|
whisper.settings.Store(bloomFilterIdx, b)
|
||||||
@ -558,14 +558,14 @@ func (whisper *Whisper) Subscribe(f *Filter) (string, error) {
|
|||||||
// updateBloomFilter recalculates the new value of bloom filter,
|
// updateBloomFilter recalculates the new value of bloom filter,
|
||||||
// and informs the peers if necessary.
|
// and informs the peers if necessary.
|
||||||
func (whisper *Whisper) updateBloomFilter(f *Filter) {
|
func (whisper *Whisper) updateBloomFilter(f *Filter) {
|
||||||
aggregate := make([]byte, bloomFilterSize)
|
aggregate := make([]byte, BloomFilterSize)
|
||||||
for _, t := range f.Topics {
|
for _, t := range f.Topics {
|
||||||
top := BytesToTopic(t)
|
top := BytesToTopic(t)
|
||||||
b := TopicToBloom(top)
|
b := TopicToBloom(top)
|
||||||
aggregate = addBloom(aggregate, b)
|
aggregate = addBloom(aggregate, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bloomFilterMatch(whisper.BloomFilter(), aggregate) {
|
if !BloomFilterMatch(whisper.BloomFilter(), aggregate) {
|
||||||
// existing bloom filter must be updated
|
// existing bloom filter must be updated
|
||||||
aggregate = addBloom(whisper.BloomFilter(), aggregate)
|
aggregate = addBloom(whisper.BloomFilter(), aggregate)
|
||||||
whisper.SetBloomFilter(aggregate)
|
whisper.SetBloomFilter(aggregate)
|
||||||
@ -701,7 +701,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
|||||||
case bloomFilterExCode:
|
case bloomFilterExCode:
|
||||||
var bloom []byte
|
var bloom []byte
|
||||||
err := packet.Decode(&bloom)
|
err := packet.Decode(&bloom)
|
||||||
if err == nil && len(bloom) != bloomFilterSize {
|
if err == nil && len(bloom) != BloomFilterSize {
|
||||||
err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
|
err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -779,11 +779,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
|
if !BloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
|
||||||
// maybe the value was recently changed, and the peers did not adjust yet.
|
// maybe the value was recently changed, and the peers did not adjust yet.
|
||||||
// in this case the previous value is retrieved by BloomFilterTolerance()
|
// in this case the previous value is retrieved by BloomFilterTolerance()
|
||||||
// for a short period of peer synchronization.
|
// for a short period of peer synchronization.
|
||||||
if !bloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
||||||
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
||||||
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
||||||
}
|
}
|
||||||
@ -1025,12 +1025,12 @@ func isFullNode(bloom []byte) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func bloomFilterMatch(filter, sample []byte) bool {
|
func BloomFilterMatch(filter, sample []byte) bool {
|
||||||
if filter == nil {
|
if filter == nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < bloomFilterSize; i++ {
|
for i := 0; i < BloomFilterSize; i++ {
|
||||||
f := filter[i]
|
f := filter[i]
|
||||||
s := sample[i]
|
s := sample[i]
|
||||||
if (f | s) != f {
|
if (f | s) != f {
|
||||||
@ -1042,8 +1042,8 @@ func bloomFilterMatch(filter, sample []byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func addBloom(a, b []byte) []byte {
|
func addBloom(a, b []byte) []byte {
|
||||||
c := make([]byte, bloomFilterSize)
|
c := make([]byte, BloomFilterSize)
|
||||||
for i := 0; i < bloomFilterSize; i++ {
|
for i := 0; i < BloomFilterSize; i++ {
|
||||||
c[i] = a[i] | b[i]
|
c[i] = a[i] | b[i]
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
|
@ -826,11 +826,11 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
|
|||||||
func TestBloom(t *testing.T) {
|
func TestBloom(t *testing.T) {
|
||||||
topic := TopicType{0, 0, 255, 6}
|
topic := TopicType{0, 0, 255, 6}
|
||||||
b := TopicToBloom(topic)
|
b := TopicToBloom(topic)
|
||||||
x := make([]byte, bloomFilterSize)
|
x := make([]byte, BloomFilterSize)
|
||||||
x[0] = byte(1)
|
x[0] = byte(1)
|
||||||
x[32] = byte(1)
|
x[32] = byte(1)
|
||||||
x[bloomFilterSize-1] = byte(128)
|
x[BloomFilterSize-1] = byte(128)
|
||||||
if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) {
|
if !BloomFilterMatch(x, b) || !BloomFilterMatch(b, x) {
|
||||||
t.Fatalf("bloom filter does not match the mask")
|
t.Fatalf("bloom filter does not match the mask")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -842,11 +842,11 @@ func TestBloom(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("math rand error")
|
t.Fatalf("math rand error")
|
||||||
}
|
}
|
||||||
if !bloomFilterMatch(b, b) {
|
if !BloomFilterMatch(b, b) {
|
||||||
t.Fatalf("bloom filter does not match self")
|
t.Fatalf("bloom filter does not match self")
|
||||||
}
|
}
|
||||||
x = addBloom(x, b)
|
x = addBloom(x, b)
|
||||||
if !bloomFilterMatch(x, b) {
|
if !BloomFilterMatch(x, b) {
|
||||||
t.Fatalf("bloom filter does not match combined bloom")
|
t.Fatalf("bloom filter does not match combined bloom")
|
||||||
}
|
}
|
||||||
if !isFullNode(nil) {
|
if !isFullNode(nil) {
|
||||||
@ -856,16 +856,16 @@ func TestBloom(t *testing.T) {
|
|||||||
if isFullNode(x) {
|
if isFullNode(x) {
|
||||||
t.Fatalf("isFullNode false positive")
|
t.Fatalf("isFullNode false positive")
|
||||||
}
|
}
|
||||||
for i := 0; i < bloomFilterSize; i++ {
|
for i := 0; i < BloomFilterSize; i++ {
|
||||||
b[i] = byte(255)
|
b[i] = byte(255)
|
||||||
}
|
}
|
||||||
if !isFullNode(b) {
|
if !isFullNode(b) {
|
||||||
t.Fatalf("isFullNode false negative")
|
t.Fatalf("isFullNode false negative")
|
||||||
}
|
}
|
||||||
if bloomFilterMatch(x, b) {
|
if BloomFilterMatch(x, b) {
|
||||||
t.Fatalf("bloomFilterMatch false positive")
|
t.Fatalf("bloomFilterMatch false positive")
|
||||||
}
|
}
|
||||||
if !bloomFilterMatch(b, x) {
|
if !BloomFilterMatch(b, x) {
|
||||||
t.Fatalf("bloomFilterMatch false negative")
|
t.Fatalf("bloomFilterMatch false negative")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -879,7 +879,7 @@ func TestBloom(t *testing.T) {
|
|||||||
t.Fatalf("failed to set bloom filter: %s", err)
|
t.Fatalf("failed to set bloom filter: %s", err)
|
||||||
}
|
}
|
||||||
f = w.BloomFilter()
|
f = w.BloomFilter()
|
||||||
if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) {
|
if !BloomFilterMatch(f, x) || !BloomFilterMatch(x, f) {
|
||||||
t.Fatalf("retireved wrong bloom filter")
|
t.Fatalf("retireved wrong bloom filter")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user