Proper start/stoping wpeers

This commit is contained in:
obscuren 2014-12-08 13:16:50 +01:00
parent ebe2d9d872
commit e3a8412df3
2 changed files with 24 additions and 5 deletions

@ -38,6 +38,13 @@ func (self *peer) init() error {
func (self *peer) start() { func (self *peer) start() {
go self.update() go self.update()
self.peer.Infoln("whisper started")
}
func (self *peer) stop() {
self.peer.Infoln("whisper stopped")
close(self.quit)
} }
func (self *peer) update() { func (self *peer) update() {
@ -69,9 +76,11 @@ func (self *peer) broadcast(envelopes []*Envelope) error {
} }
} }
msg := p2p.NewMsg(envelopesMsg, envs[:i]...) if i > 0 {
if err := self.ws.WriteMsg(msg); err != nil { msg := p2p.NewMsg(envelopesMsg, envs[:i]...)
return err if err := self.ws.WriteMsg(msg); err != nil {
return err
}
} }
return nil return nil

@ -1,7 +1,7 @@
package whisper package whisper
import ( import (
"fmt" "bytes"
"sync" "sync"
"time" "time"
@ -23,6 +23,10 @@ func HS(hash string) Hash {
return Hash{hash} return Hash{hash}
} }
func (self Hash) Compare(other Hash) int {
return bytes.Compare([]byte(self.hash), []byte(other.hash))
}
// MOVE ME END // MOVE ME END
const ( const (
@ -73,13 +77,18 @@ func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) {
self.add(envelope) self.add(envelope)
} }
// Main handler for passing whisper messages to whisper peer objects
func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
wpeer := NewPeer(self, peer, ws) wpeer := NewPeer(self, peer, ws)
// init whisper peer (handshake/status)
if err := wpeer.init(); err != nil { if err := wpeer.init(); err != nil {
return err return err
} }
// kick of the main handler for broadcasting/managing envelopes
go wpeer.start() go wpeer.start()
defer wpeer.stop()
// Main *read* loop. Writing is done by the peer it self.
for { for {
msg, err := ws.ReadMsg() msg, err := ws.ReadMsg()
if err != nil { if err != nil {
@ -96,11 +105,11 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
} }
} }
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
func (self *Whisper) add(envelope *Envelope) { func (self *Whisper) add(envelope *Envelope) {
self.mmu.Lock() self.mmu.Lock()
defer self.mmu.Unlock() defer self.mmu.Unlock()
fmt.Println("received envelope", envelope)
self.messages[envelope.Hash()] = envelope self.messages[envelope.Hash()] = envelope
if self.expiry[envelope.Expiry] == nil { if self.expiry[envelope.Expiry] == nil {
self.expiry[envelope.Expiry] = set.NewNonTS() self.expiry[envelope.Expiry] = set.NewNonTS()
@ -120,6 +129,7 @@ out:
} }
} }
} }
func (self *Whisper) expire() { func (self *Whisper) expire() {
self.mmu.Lock() self.mmu.Lock()
defer self.mmu.Unlock() defer self.mmu.Unlock()