whisper: general cleanups, documentation

This commit is contained in:
Péter Szilágyi 2015-04-14 13:24:43 +03:00
parent 5205b2f19b
commit 59bff46505
4 changed files with 157 additions and 140 deletions

View File

@ -24,7 +24,7 @@ type Envelope struct {
Data []byte Data []byte
Nonce uint32 Nonce uint32
hash common.Hash hash common.Hash // Cached hash of the envelope to avoid rehashing every time
} }
// NewEnvelope wraps a Whisper message with expiration and destination data // NewEnvelope wraps a Whisper message with expiration and destination data
@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) {
} }
} }
// valid checks whether the claimed proof of work was indeed executed.
// TODO: Is this really useful? Isn't this always true?
func (self *Envelope) valid() bool {
d := make([]byte, 64)
copy(d[:32], self.rlpWithoutNonce())
binary.BigEndian.PutUint32(d[60:], self.Nonce)
return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0
}
// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
func (self *Envelope) rlpWithoutNonce() []byte { func (self *Envelope) rlpWithoutNonce() []byte {
enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})

View File

@ -1,10 +1,13 @@
// Contains the message filter for fine grained subscriptions.
package whisper package whisper
import "crypto/ecdsa" import "crypto/ecdsa"
// Filter is used to subscribe to specific types of whisper messages.
type Filter struct { type Filter struct {
To *ecdsa.PublicKey To *ecdsa.PublicKey // Recipient of the message
From *ecdsa.PublicKey From *ecdsa.PublicKey // Sender of the message
Topics []Topic Topics []Topic // Topics to watch messages on
Fn func(*Message) Fn func(*Message) // Handler in case of a match
} }

View File

@ -9,10 +9,6 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
const (
protocolVersion uint64 = 0x02
)
type peer struct { type peer struct {
host *Whisper host *Whisper
peer *p2p.Peer peer *p2p.Peer

View File

@ -2,7 +2,6 @@ package whisper
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"sync" "sync"
"time" "time"
@ -17,12 +16,16 @@ import (
) )
const ( const (
statusMsg = 0x0 statusMsg = 0x00
envelopesMsg = 0x01 envelopesMsg = 0x01
whisperVersion = 0x02
protocolVersion uint64 = 0x02
protocolName = "shh"
signatureFlag = byte(1 << 7) signatureFlag = byte(1 << 7)
signatureLength = 65 signatureLength = 65
expirationTicks = 800 * time.Millisecond
) )
const ( const (
@ -42,9 +45,9 @@ type Whisper struct {
protocol p2p.Protocol protocol p2p.Protocol
filters *filter.Filters filters *filter.Filters
mmu sync.RWMutex mmu sync.RWMutex // Message mutex to sync the below pool
messages map[common.Hash]*Envelope messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node
expiry map[uint32]*set.SetNonTS expiry map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter)
quit chan struct{} quit chan struct{}
@ -63,8 +66,8 @@ func New() *Whisper {
// p2p whisper sub protocol handler // p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{ whisper.protocol = p2p.Protocol{
Name: "shh", Name: protocolName,
Version: uint(whisperVersion), Version: uint(protocolVersion),
Length: 2, Length: 2,
Run: whisper.msgHandler, Run: whisper.msgHandler,
} }
@ -72,10 +75,64 @@ func New() *Whisper {
return whisper return whisper
} }
// Protocol returns the whisper sub-protocol handler for this particular client.
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
}
// Version returns the whisper sub-protocols version number.
func (self *Whisper) Version() uint { func (self *Whisper) Version() uint {
return self.protocol.Version return self.protocol.Version
} }
// NewIdentity generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption.
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
return key
}
// HasIdentity checks if the the whisper node is configured with the private key
// of the specified public pair.
func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
return self.keys[string(crypto.FromECDSAPub(key))] != nil
}
// GetIdentity retrieves the private key of the specified public identity.
func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
return self.keys[string(crypto.FromECDSAPub(key))]
}
// Watch installs a new message handler to run in case a matching packet arrives
// from the whisper network.
func (self *Whisper) Watch(options Filter) int {
filter := filter.Generic{
Str1: string(crypto.FromECDSAPub(options.To)),
Str2: string(crypto.FromECDSAPub(options.From)),
Data: NewTopicSet(options.Topics),
Fn: func(data interface{}) {
options.Fn(data.(*Message))
},
}
return self.filters.Install(filter)
}
// Unwatch removes an installed message handler.
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (self *Whisper) Send(envelope *Envelope) error {
return self.add(envelope)
}
func (self *Whisper) Start() { func (self *Whisper) Start() {
glog.V(logger.Info).Infoln("Whisper started") glog.V(logger.Info).Infoln("Whisper started")
go self.update() go self.update()
@ -83,29 +140,7 @@ func (self *Whisper) Start() {
func (self *Whisper) Stop() { func (self *Whisper) Stop() {
close(self.quit) close(self.quit)
} glog.V(logger.Info).Infoln("Whisper stopped")
func (self *Whisper) Send(envelope *Envelope) error {
return self.add(envelope)
}
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
return key
}
func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
return self.keys[string(crypto.FromECDSAPub(key))] != nil
}
func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
return self.keys[string(crypto.FromECDSAPub(key))]
} }
// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
@ -117,22 +152,7 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
// return false // return false
// } // }
func (self *Whisper) Watch(opts Filter) int { /*func (self *Whisper) Messages(id int) (messages []*Message) {
return self.filters.Install(filter.Generic{
Str1: string(crypto.FromECDSAPub(opts.To)),
Str2: string(crypto.FromECDSAPub(opts.From)),
Data: NewTopicSet(opts.Topics),
Fn: func(data interface{}) {
opts.Fn(data.(*Message))
},
})
}
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
func (self *Whisper) Messages(id int) (messages []*Message) {
filter := self.filters.Get(id) filter := self.filters.Get(id)
if filter != nil { if filter != nil {
for _, e := range self.messages { for _, e := range self.messages {
@ -146,6 +166,36 @@ func (self *Whisper) Messages(id int) (messages []*Message) {
} }
return return
}*/
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp.
func (self *Whisper) add(envelope *Envelope) error {
self.mmu.Lock()
defer self.mmu.Unlock()
// Insert the message into the tracked pool
hash := envelope.Hash()
if _, ok := self.messages[hash]; ok {
glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
return nil
}
self.messages[hash] = envelope
// Insert the message into the expiration pool for later removal
if self.expiry[envelope.Expiry] == nil {
self.expiry[envelope.Expiry] = set.NewNonTS()
}
if !self.expiry[envelope.Expiry].Has(hash) {
self.expiry[envelope.Expiry].Add(hash)
// Notify the local node of a message arrival
go self.postEvent(envelope)
}
glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope)
return nil
} }
// Main handler for passing whisper messages to whisper peer objects // Main handler for passing whisper messages to whisper peer objects
@ -182,79 +232,6 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
} }
} }
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
func (self *Whisper) add(envelope *Envelope) error {
if !envelope.valid() {
return errors.New("invalid pow provided for envelope")
}
self.mmu.Lock()
defer self.mmu.Unlock()
hash := envelope.Hash()
self.messages[hash] = envelope
if self.expiry[envelope.Expiry] == nil {
self.expiry[envelope.Expiry] = set.NewNonTS()
}
if !self.expiry[envelope.Expiry].Has(hash) {
self.expiry[envelope.Expiry].Add(hash)
go self.postEvent(envelope)
}
glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope)
return nil
}
func (self *Whisper) update() {
expire := time.NewTicker(800 * time.Millisecond)
out:
for {
select {
case <-expire.C:
self.expire()
case <-self.quit:
break out
}
}
}
func (self *Whisper) expire() {
self.mmu.Lock()
defer self.mmu.Unlock()
now := uint32(time.Now().Unix())
for then, hashSet := range self.expiry {
if then > now {
continue
}
hashSet.Each(func(v interface{}) bool {
delete(self.messages, v.(common.Hash))
return true
})
self.expiry[then].Clear()
}
}
func (self *Whisper) envelopes() (envelopes []*Envelope) {
self.mmu.RLock()
defer self.mmu.RUnlock()
envelopes = make([]*Envelope, len(self.messages))
i := 0
for _, envelope := range self.messages {
envelopes[i] = envelope
i++
}
return
}
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
}
// postEvent opens an envelope with the configured identities and delivers the // postEvent opens an envelope with the configured identities and delivers the
// message upstream from application processing. // message upstream from application processing.
func (self *Whisper) postEvent(envelope *Envelope) { func (self *Whisper) postEvent(envelope *Envelope) {
@ -293,3 +270,54 @@ func createFilter(message *Message, topics []Topic) filter.Filter {
Data: NewTopicSet(topics), Data: NewTopicSet(topics),
} }
} }
// update loops until the lifetime of the whisper node, updating its internal
// state by expiring stale messages from the pool.
func (self *Whisper) update() {
// Start a ticker to check for expirations
expire := time.NewTicker(expirationTicks)
// Repeat updates until termination is requested
for {
select {
case <-expire.C:
self.expire()
case <-self.quit:
return
}
}
}
// expire iterates over all the expiration timestamps, removing all stale
// messages from the pools.
func (self *Whisper) expire() {
self.mmu.Lock()
defer self.mmu.Unlock()
now := uint32(time.Now().Unix())
for then, hashSet := range self.expiry {
// Short circuit if a future time
if then > now {
continue
}
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
delete(self.messages, v.(common.Hash))
return true
})
self.expiry[then].Clear()
}
}
// envelopes retrieves all the messages currently pooled by the node.
func (self *Whisper) envelopes() []*Envelope {
self.mmu.RLock()
defer self.mmu.RUnlock()
envelopes := make([]*Envelope, 0, len(self.messages))
for _, envelope := range self.messages {
envelopes = append(envelopes, envelope)
}
return envelopes
}