From e3a8412df3fe75fe498a3fce64fd2fd691a18183 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Dec 2014 13:16:50 +0100 Subject: [PATCH] Proper start/stoping wpeers --- whisper/peer.go | 15 ++++++++++++--- whisper/whisper.go | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/whisper/peer.go b/whisper/peer.go index 5fe50ba594..3471ddb2fb 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -38,6 +38,13 @@ func (self *peer) init() error { func (self *peer) start() { go self.update() + self.peer.Infoln("whisper started") +} + +func (self *peer) stop() { + self.peer.Infoln("whisper stopped") + + close(self.quit) } func (self *peer) update() { @@ -69,9 +76,11 @@ func (self *peer) broadcast(envelopes []*Envelope) error { } } - msg := p2p.NewMsg(envelopesMsg, envs[:i]...) - if err := self.ws.WriteMsg(msg); err != nil { - return err + if i > 0 { + msg := p2p.NewMsg(envelopesMsg, envs[:i]...) + if err := self.ws.WriteMsg(msg); err != nil { + return err + } } return nil diff --git a/whisper/whisper.go b/whisper/whisper.go index 692e6bc2c1..255bd21527 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -1,7 +1,7 @@ package whisper import ( - "fmt" + "bytes" "sync" "time" @@ -23,6 +23,10 @@ func HS(hash string) Hash { return Hash{hash} } +func (self Hash) Compare(other Hash) int { + return bytes.Compare([]byte(self.hash), []byte(other.hash)) +} + // MOVE ME END const ( @@ -73,13 +77,18 @@ func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) { self.add(envelope) } +// Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) + // init whisper peer (handshake/status) if err := wpeer.init(); err != nil { return err } + // kick of the main handler for broadcasting/managing envelopes go wpeer.start() + defer wpeer.stop() + // Main *read* loop. Writing is done by the peer it self. for { msg, err := ws.ReadMsg() if err != nil { @@ -96,11 +105,11 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } } +// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) { self.mmu.Lock() defer self.mmu.Unlock() - fmt.Println("received envelope", envelope) self.messages[envelope.Hash()] = envelope if self.expiry[envelope.Expiry] == nil { self.expiry[envelope.Expiry] = set.NewNonTS() @@ -120,6 +129,7 @@ out: } } } + func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock()