Merge pull request #1996 from obscuren/whisper-spam-fix
whisper: fixed broadcast race
This commit is contained in:
commit
168d0e9e45
@ -238,15 +238,18 @@ func TestPeerMessageExpiration(t *testing.T) {
|
|||||||
t.Fatalf("failed to send message: %v", err)
|
t.Fatalf("failed to send message: %v", err)
|
||||||
}
|
}
|
||||||
payload := []interface{}{envelope}
|
payload := []interface{}{envelope}
|
||||||
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
|
||||||
|
// A premature empty message may have been broadcast, check the next too
|
||||||
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
|
||||||
t.Fatalf("message mismatch: %v", err)
|
t.Fatalf("message mismatch: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Check that the message is inside the cache
|
// Check that the message is inside the cache
|
||||||
if !peer.known.Has(envelope.Hash()) {
|
if !peer.known.Has(envelope.Hash()) {
|
||||||
t.Fatalf("message not found in cache")
|
t.Fatalf("message not found in cache")
|
||||||
}
|
}
|
||||||
// Discard messages until expiration and check cache again
|
// Discard messages until expiration and check cache again
|
||||||
exp := time.Now().Add(time.Second + expirationCycle)
|
exp := time.Now().Add(time.Second + 2*expirationCycle + 100*time.Millisecond)
|
||||||
for time.Now().Before(exp) {
|
for time.Now().Before(exp) {
|
||||||
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
|
||||||
t.Fatalf("message mismatch: %v", err)
|
t.Fatalf("message mismatch: %v", err)
|
||||||
|
@ -234,6 +234,11 @@ func (self *Whisper) add(envelope *Envelope) error {
|
|||||||
self.poolMu.Lock()
|
self.poolMu.Lock()
|
||||||
defer self.poolMu.Unlock()
|
defer self.poolMu.Unlock()
|
||||||
|
|
||||||
|
// short circuit when a received envelope has already expired
|
||||||
|
if envelope.Expiry <= uint32(time.Now().Unix()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Insert the message into the tracked pool
|
// Insert the message into the tracked pool
|
||||||
hash := envelope.Hash()
|
hash := envelope.Hash()
|
||||||
if _, ok := self.messages[hash]; ok {
|
if _, ok := self.messages[hash]; ok {
|
||||||
|
@ -207,4 +207,13 @@ func TestMessageExpiration(t *testing.T) {
|
|||||||
if found {
|
if found {
|
||||||
t.Fatalf("message not expired from cache")
|
t.Fatalf("message not expired from cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node.add(envelope)
|
||||||
|
node.poolMu.RLock()
|
||||||
|
_, found = node.messages[envelope.Hash()]
|
||||||
|
node.poolMu.RUnlock()
|
||||||
|
if found {
|
||||||
|
t.Fatalf("message was added to cache")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user