Implemented watching using filter package

* Added filters / watches
* Removed event dep
This commit is contained in:
obscuren 2014-12-12 22:23:42 +01:00
parent ed1538248f
commit a17a1f9208
6 changed files with 145 additions and 26 deletions

@ -61,22 +61,27 @@ func (self *Envelope) Seal(pow time.Duration) {
self.proveWork(pow) self.proveWork(pow)
} }
func (self *Envelope) Open(prv *ecdsa.PrivateKey) (*Message, error) { func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) {
data := self.Data data := self.Data
if data[0] > 0 && len(data) < 66 { var message Message
dataStart := 1
if data[0] > 0 {
if len(data) < 66 {
return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66")
} }
dataStart = 66
if data[0] > 0 { message.Flags = data[0]
payload, err := crypto.Decrypt(prv, data[66:]) message.Signature = data[1:66]
}
message.Payload = data[dataStart:]
if prv != nil {
message.Payload, err = crypto.Decrypt(prv, message.Payload)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err)
} }
return NewMessage(payload), nil
} }
return NewMessage(data[1:]), nil return &message, nil
} }
func (self *Envelope) proveWork(dura time.Duration) { func (self *Envelope) proveWork(dura time.Duration) {

10
whisper/filter.go Normal file

@ -0,0 +1,10 @@
package whisper
import "crypto/ecdsa"
type Filter struct {
To *ecdsa.PrivateKey
From *ecdsa.PublicKey
Topics [][]byte
Fn func(*Message)
}

@ -8,6 +8,7 @@ import (
"net" "net"
"os" "os"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/whisper" "github.com/ethereum/go-ethereum/whisper"
@ -17,9 +18,9 @@ import (
func main() { func main() {
logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel))
pub, sec := secp256k1.GenerateKeyPair() pub, _ := secp256k1.GenerateKeyPair()
whisper := whisper.New(sec) whisper := whisper.New(&event.TypeMux{})
srv := p2p.Server{ srv := p2p.Server{
MaxPeers: 10, MaxPeers: 10,

@ -28,15 +28,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
} }
func (self *Message) Recover() *ecdsa.PublicKey { func (self *Message) Recover() *ecdsa.PublicKey {
defer func() { recover() }() // in case of invalid sig
return crypto.SigToPub(self.hash(), self.Signature) return crypto.SigToPub(self.hash(), self.Signature)
} }
func (self *Message) Encrypt(from *ecdsa.PrivateKey, to *ecdsa.PublicKey) (err error) { func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) {
err = self.sign(from)
if err != nil {
return err
}
self.Payload, err = crypto.Encrypt(to, self.Payload) self.Payload, err = crypto.Encrypt(to, self.Payload)
if err != nil { if err != nil {
return err return err
@ -57,8 +53,16 @@ type Opts struct {
} }
func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) {
if opts.To != nil && opts.From != nil { if opts.From != nil {
if err := self.Encrypt(opts.From, opts.To); err != nil { err := self.sign(opts.From)
if err != nil {
return nil, err
}
}
if opts.To != nil {
err := self.Encrypt(opts.To)
if err != nil {
return nil, err return nil, err
} }
} }

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event/filter"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
@ -38,28 +39,38 @@ const (
envelopesMsg = 0x01 envelopesMsg = 0x01
) )
type MessageEvent struct {
To *ecdsa.PrivateKey
From *ecdsa.PublicKey
Message *Message
}
const DefaultTtl = 50 * time.Second const DefaultTtl = 50 * time.Second
type Whisper struct { type Whisper struct {
key *ecdsa.PrivateKey
protocol p2p.Protocol protocol p2p.Protocol
filters *filter.Filters
mmu sync.RWMutex mmu sync.RWMutex
messages map[Hash]*Envelope messages map[Hash]*Envelope
expiry map[uint32]*set.SetNonTS expiry map[uint32]*set.SetNonTS
quit chan struct{} quit chan struct{}
keys []*ecdsa.PrivateKey
} }
func New(sec []byte) *Whisper { func New() *Whisper {
whisper := &Whisper{ whisper := &Whisper{
key: crypto.ToECDSA(sec),
messages: make(map[Hash]*Envelope), messages: make(map[Hash]*Envelope),
filters: filter.New(),
expiry: make(map[uint32]*set.SetNonTS), expiry: make(map[uint32]*set.SetNonTS),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
whisper.filters.Start()
go whisper.update() go whisper.update()
// XXX TODO REMOVE TESTING CODE
msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
envelope, _ := msg.Seal(DefaultPow, Opts{ envelope, _ := msg.Seal(DefaultPow, Opts{
Ttl: DefaultTtl, Ttl: DefaultTtl,
@ -67,6 +78,7 @@ func New(sec []byte) *Whisper {
if err := whisper.Send(envelope); err != nil { if err := whisper.Send(envelope); err != nil {
fmt.Println(err) fmt.Println(err)
} }
// XXX TODO REMOVE TESTING CODE
// p2p whisper sub protocol handler // p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{ whisper.protocol = p2p.Protocol{
@ -87,6 +99,35 @@ func (self *Whisper) Send(envelope *Envelope) error {
return self.add(envelope) return self.add(envelope)
} }
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
self.keys = append(self.keys, key)
return key
}
func (self *Whisper) HasIdentity(key *ecdsa.PrivateKey) bool {
for _, key := range self.keys {
if key.D.Cmp(key.D) == 0 {
return true
}
}
return false
}
func (self *Whisper) Watch(opts Filter) int {
return self.filters.Install(filter.Generic{
Str1: string(crypto.FromECDSA(opts.To)),
Str2: string(crypto.FromECDSAPub(opts.From)),
Fn: func(data interface{}) {
opts.Fn(data.(*Message))
},
})
}
// Main handler for passing whisper messages to whisper peer objects // Main handler for passing whisper messages to whisper peer objects
func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
wpeer := NewPeer(self, peer, ws) wpeer := NewPeer(self, peer, ws)
@ -122,7 +163,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
func (self *Whisper) add(envelope *Envelope) error { func (self *Whisper) add(envelope *Envelope) error {
if !envelope.valid() { if !envelope.valid() {
return errors.New("invalid pow for envelope") return errors.New("invalid pow provided for envelope")
} }
self.mmu.Lock() self.mmu.Lock()
@ -136,11 +177,9 @@ func (self *Whisper) add(envelope *Envelope) error {
if !self.expiry[envelope.Expiry].Has(hash) { if !self.expiry[envelope.Expiry].Has(hash) {
self.expiry[envelope.Expiry].Add(hash) self.expiry[envelope.Expiry].Add(hash)
// TODO notify listeners (given that we had any ...) self.postEvent(envelope)
} }
fmt.Println("add", envelope)
return nil return nil
} }
@ -189,6 +228,19 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) {
return return
} }
func (self *Whisper) postEvent(envelope *Envelope) {
for _, key := range self.keys {
if message, err := envelope.Open(key); err == nil {
// Create a custom filter?
self.filters.Notify(filter.Generic{
Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())),
}, message)
} else {
fmt.Println(err)
}
}
}
func (self *Whisper) Protocol() p2p.Protocol { func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol return self.protocol
} }

47
whisper/whisper_test.go Normal file

@ -0,0 +1,47 @@
package whisper
import (
"fmt"
"testing"
"time"
)
func TestKeyManagement(t *testing.T) {
whisper := New()
key := whisper.NewIdentity()
if !whisper.HasIdentity(key) {
t.Error("expected whisper to have identify")
}
}
func TestEvent(t *testing.T) {
res := make(chan *Message, 1)
whisper := New()
id := whisper.NewIdentity()
whisper.Watch(Filter{
To: id,
Fn: func(msg *Message) {
res <- msg
},
})
msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
envelope, err := msg.Seal(DefaultPow, Opts{
Ttl: DefaultTtl,
From: id,
To: &id.PublicKey,
})
if err != nil {
fmt.Println(err)
t.FailNow()
}
tick := time.NewTicker(time.Second)
whisper.postEvent(envelope)
select {
case <-res:
case <-tick.C:
t.Error("did not receive message")
}
}