From ba996f5e27572e853bcc5c815ae72082a15c9183 Mon Sep 17 00:00:00 2001 From: gluk256 Date: Tue, 20 Dec 2016 00:58:01 +0100 Subject: [PATCH] whisper: refactoring (#3411) * whisper: refactored message processing * whisper: final polishing * whisper: logging updated * whisper: moved the check, changed the default PoW * whisper: refactoring of message queuing * whisper: refactored parameters --- whisper/shhapi/api.go | 16 +++++ whisper/shhapi/api_test.go | 9 +++ whisper/whisperv5/doc.go | 19 +++--- whisper/whisperv5/envelope.go | 18 +++-- whisper/whisperv5/filter.go | 20 ++++-- whisper/whisperv5/filter_test.go | 12 ++-- whisper/whisperv5/message.go | 10 +-- whisper/whisperv5/message_test.go | 51 +++++++++++++-- whisper/whisperv5/peer_test.go | 2 + whisper/whisperv5/topic.go | 3 +- whisper/whisperv5/whisper.go | 105 ++++++++++++++++++++++-------- whisper/whisperv5/whisper_test.go | 54 +++++++++++++++ 12 files changed, 253 insertions(+), 66 deletions(-) diff --git a/whisper/shhapi/api.go b/whisper/shhapi/api.go index 6ed3e17c2..f2597e133 100644 --- a/whisper/shhapi/api.go +++ b/whisper/shhapi/api.go @@ -55,6 +55,22 @@ func APIs() []rpc.API { } } +// Start starts the Whisper worker threads. +func (api *PublicWhisperAPI) Start() error { + if api.whisper == nil { + return whisperOffLineErr + } + return api.whisper.Start(nil) +} + +// Stop stops the Whisper worker threads. +func (api *PublicWhisperAPI) Stop() error { + if api.whisper == nil { + return whisperOffLineErr + } + return api.whisper.Stop() +} + // Version returns the Whisper version this node offers. func (api *PublicWhisperAPI) Version() (*rpc.HexNumber, error) { if api.whisper == nil { diff --git a/whisper/shhapi/api_test.go b/whisper/shhapi/api_test.go index 13a7cee66..a10e2e476 100644 --- a/whisper/shhapi/api_test.go +++ b/whisper/shhapi/api_test.go @@ -277,6 +277,9 @@ func TestIntegrationAsym(t *testing.T) { t.Fatalf("failed to create API.") } + api.Start() + defer api.Stop() + sig, err := api.NewIdentity() if err != nil { t.Fatalf("failed NewIdentity: %s.", err) @@ -375,6 +378,9 @@ func TestIntegrationSym(t *testing.T) { t.Fatalf("failed to create API.") } + api.Start() + defer api.Stop() + keyname := "schluessel" err := api.GenerateSymKey(keyname) if err != nil { @@ -471,6 +477,9 @@ func TestIntegrationSymWithFilter(t *testing.T) { t.Fatalf("failed to create API.") } + api.Start() + defer api.Stop() + keyname := "schluessel" err := api.GenerateSymKey(keyname) if err != nil { diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go index 223d8246e..e2e255e9e 100644 --- a/whisper/whisperv5/doc.go +++ b/whisper/whisperv5/doc.go @@ -15,9 +15,7 @@ // along with the go-ethereum library. If not, see . /* -Package whisper implements the Whisper PoC-1. - -(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec) +Package whisper implements the Whisper protocol (version 5). Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP). As such it may be likened and compared to both, not dissimilar to the @@ -42,11 +40,11 @@ const ( ProtocolVersionStr = "5.0" ProtocolName = "shh" - statusCode = 0 - messagesCode = 1 - p2pCode = 2 - mailRequestCode = 3 - NumberOfMessageCodes = 32 + statusCode = 0 // used by whisper protocol + messagesCode = 1 // normal whisper message + p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any futher) + p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol + NumberOfMessageCodes = 64 paddingMask = byte(3) signatureFlag = byte(4) @@ -57,11 +55,12 @@ const ( saltLength = 12 AESNonceMaxLength = 12 - MaxMessageLength = 0xFFFF // todo: remove this restriction after testing in morden and analizing stats. this should be regulated by MinimumPoW. - MinimumPoW = 10.0 // todo: review + MaxMessageLength = 0xFFFF // todo: remove this restriction after testing. this should be regulated by PoW. + MinimumPoW = 1.0 // todo: review after testing. padSizeLimitLower = 128 // it can not be less - we don't want to reveal the absence of signature padSizeLimitUpper = 256 // just an arbitrary number, could be changed without losing compatibility + messageQueueLimit = 1024 expirationCycle = time.Second transmissionCycle = 300 * time.Millisecond diff --git a/whisper/whisperv5/envelope.go b/whisper/whisperv5/envelope.go index 3d048bb44..1b976705d 100644 --- a/whisper/whisperv5/envelope.go +++ b/whisper/whisperv5/envelope.go @@ -14,14 +14,14 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Contains the Whisper protocol Envelope element. For formal details please see -// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#envelopes. +// Contains the Whisper protocol Envelope element. package whisperv5 import ( "crypto/ecdsa" "encoding/binary" + "errors" "fmt" "math" "time" @@ -86,8 +86,8 @@ func (e *Envelope) Ver() uint64 { // Seal closes the envelope by spending the requested amount of time as a proof // of work on hashing the data. -func (e *Envelope) Seal(options *MessageParams) { - var target int +func (e *Envelope) Seal(options *MessageParams) error { + var target, bestBit int if options.PoW == 0 { // adjust for the duration of Seal() execution only if execution time is predefined unconditionally e.Expiry += options.WorkTime @@ -99,7 +99,7 @@ func (e *Envelope) Seal(options *MessageParams) { h := crypto.Keccak256(e.rlpWithoutNonce()) copy(buf[:32], h) - finish, bestBit := time.Now().Add(time.Duration(options.WorkTime)*time.Second).UnixNano(), 0 + finish := time.Now().Add(time.Duration(options.WorkTime) * time.Second).UnixNano() for nonce := uint64(0); time.Now().UnixNano() < finish; { for i := 0; i < 1024; i++ { binary.BigEndian.PutUint64(buf[56:], nonce) @@ -108,12 +108,18 @@ func (e *Envelope) Seal(options *MessageParams) { if firstBit > bestBit { e.EnvNonce, bestBit = nonce, firstBit if target > 0 && bestBit >= target { - return + return nil } } nonce++ } } + + if target > 0 && bestBit < target { + return errors.New("Failed to reach the PoW target") + } + + return nil } func (e *Envelope) PoW() float64 { diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go index fd5f5083f..3845d0c20 100644 --- a/whisper/whisperv5/filter.go +++ b/whisper/whisperv5/filter.go @@ -21,6 +21,8 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) type Filter struct { @@ -75,11 +77,12 @@ func (fs *Filters) Get(i uint32) *Filter { return fs.watchers[i] } -func (fs *Filters) NotifyWatchers(env *Envelope, messageCode uint64) { +func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { fs.mutex.RLock() var msg *ReceivedMessage - for _, watcher := range fs.watchers { - if messageCode == p2pCode && !watcher.AcceptP2P { + for j, watcher := range fs.watchers { + if p2pMessage && !watcher.AcceptP2P { + glog.V(logger.Detail).Infof("msg [%x], filter [%d]: p2p messages are not allowed \n", env.Hash(), j) continue } @@ -90,6 +93,11 @@ func (fs *Filters) NotifyWatchers(env *Envelope, messageCode uint64) { match = watcher.MatchEnvelope(env) if match { msg = env.Open(watcher) + if msg == nil { + glog.V(logger.Detail).Infof("msg [%x], filter [%d]: failed to open \n", env.Hash(), j) + } + } else { + glog.V(logger.Detail).Infof("msg [%x], filter [%d]: does not match \n", env.Hash(), j) } } @@ -137,12 +145,12 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { if f.PoW > 0 && msg.PoW < f.PoW { return false } - if f.Src != nil && !isPubKeyEqual(msg.Src, f.Src) { + if f.Src != nil && !IsPubKeyEqual(msg.Src, f.Src) { return false } if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() { - return isPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic) + return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic) } else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() { return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic) } @@ -176,7 +184,7 @@ func (f *Filter) MatchTopic(topic TopicType) bool { return false } -func isPubKeyEqual(a, b *ecdsa.PublicKey) bool { +func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool { if !ValidatePublicKey(a) { return false } else if !ValidatePublicKey(b) { diff --git a/whisper/whisperv5/filter_test.go b/whisper/whisperv5/filter_test.go index 561bb8f7d..9f2a58f28 100644 --- a/whisper/whisperv5/filter_test.go +++ b/whisper/whisperv5/filter_test.go @@ -139,7 +139,7 @@ func TestComparePubKey(t *testing.T) { if err != nil { t.Fatalf("failed to generate second key with seed %d: %s.", seed, err) } - if isPubKeyEqual(&key1.PublicKey, &key2.PublicKey) { + if IsPubKeyEqual(&key1.PublicKey, &key2.PublicKey) { t.Fatalf("public keys are equal, seed %d.", seed) } @@ -149,7 +149,7 @@ func TestComparePubKey(t *testing.T) { if err != nil { t.Fatalf("failed to generate third key with seed %d: %s.", seed, err) } - if isPubKeyEqual(&key1.PublicKey, &key3.PublicKey) { + if IsPubKeyEqual(&key1.PublicKey, &key3.PublicKey) { t.Fatalf("key1 == key3, seed %d.", seed) } } @@ -540,7 +540,7 @@ func TestWatchers(t *testing.T) { } for i = 0; i < NumMessages; i++ { - filters.NotifyWatchers(envelopes[i], messagesCode) + filters.NotifyWatchers(envelopes[i], false) } var total int @@ -593,7 +593,7 @@ func TestWatchers(t *testing.T) { } for i = 0; i < NumMessages; i++ { - filters.NotifyWatchers(envelopes[i], messagesCode) + filters.NotifyWatchers(envelopes[i], false) } for i = 0; i < NumFilters; i++ { @@ -629,7 +629,7 @@ func TestWatchers(t *testing.T) { // test AcceptP2P total = 0 - filters.NotifyWatchers(envelopes[0], p2pCode) + filters.NotifyWatchers(envelopes[0], true) for i = 0; i < NumFilters; i++ { mail = tst[i].f.Retrieve() @@ -646,7 +646,7 @@ func TestWatchers(t *testing.T) { } f.AcceptP2P = true total = 0 - filters.NotifyWatchers(envelopes[0], p2pCode) + filters.NotifyWatchers(envelopes[0], true) for i = 0; i < NumFilters; i++ { mail = tst[i].f.Retrieve() diff --git a/whisper/whisperv5/message.go b/whisper/whisperv5/message.go index f3812b1d8..a095f7c0c 100644 --- a/whisper/whisperv5/message.go +++ b/whisper/whisperv5/message.go @@ -14,9 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Contains the Whisper protocol Message element. For formal details please see -// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#messages. -// todo: fix the spec link, and move it to doc.go +// Contains the Whisper protocol Message element. package whisperv5 @@ -256,7 +254,11 @@ func (msg *SentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er } envelope = NewEnvelope(options.TTL, options.Topic, salt, nonce, msg) - envelope.Seal(options) + err = envelope.Seal(options) + if err != nil { + return nil, err + } + return envelope, nil } diff --git a/whisper/whisperv5/message_test.go b/whisper/whisperv5/message_test.go index cd327d9d9..5cbc9182f 100644 --- a/whisper/whisperv5/message_test.go +++ b/whisper/whisperv5/message_test.go @@ -30,11 +30,15 @@ func copyFromBuf(dst []byte, src []byte, beg int) int { } func generateMessageParams() (*MessageParams, error) { + // set all the parameters except p.Dst + buf := make([]byte, 1024) randomize(buf) sz := rand.Intn(400) var p MessageParams + p.PoW = 0.01 + p.WorkTime = 1 p.TTL = uint32(rand.Intn(1024)) p.Payload = make([]byte, sz) p.Padding = make([]byte, padSizeLimitUpper) @@ -52,8 +56,6 @@ func generateMessageParams() (*MessageParams, error) { return nil, err } - // p.Dst, p.PoW, p.WorkTime are not set - p.PoW = 0.01 return &p, nil } @@ -114,7 +116,7 @@ func singleMessageTest(t *testing.T, symmetric bool) { if len(decrypted.Signature) != signatureLength { t.Fatalf("failed with seed %d: signature len %d.", seed, len(decrypted.Signature)) } - if !isPubKeyEqual(decrypted.Src, ¶ms.Src.PublicKey) { + if !IsPubKeyEqual(decrypted.Src, ¶ms.Src.PublicKey) { t.Fatalf("failed with seed %d: signature mismatch.", seed) } } @@ -152,6 +154,16 @@ func TestMessageWrap(t *testing.T) { if pow < target { t.Fatalf("failed Wrap with seed %d: pow < target (%f vs. %f).", seed, pow, target) } + + // set PoW target too high, expect error + msg2 := NewSentMessage(params) + params.TTL = 1000000 + params.WorkTime = 1 + params.PoW = 10000000.0 + env, err = msg2.Wrap(params) + if err == nil { + t.Fatalf("unexpectedly reached the PoW target with seed %d.", seed) + } } func TestMessageSeal(t *testing.T) { @@ -256,7 +268,7 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) { if len(decrypted.Signature) != signatureLength { t.Fatalf("failed with seed %d: signature len %d.", seed, len(decrypted.Signature)) } - if !isPubKeyEqual(decrypted.Src, ¶ms.Src.PublicKey) { + if !IsPubKeyEqual(decrypted.Src, ¶ms.Src.PublicKey) { t.Fatalf("failed with seed %d: signature mismatch.", seed) } if decrypted.isAsymmetricEncryption() == symmetric { @@ -269,8 +281,37 @@ func singleEnvelopeOpenTest(t *testing.T, symmetric bool) { if decrypted.Dst == nil { t.Fatalf("failed with seed %d: dst is nil.", seed) } - if !isPubKeyEqual(decrypted.Dst, &key.PublicKey) { + if !IsPubKeyEqual(decrypted.Dst, &key.PublicKey) { t.Fatalf("failed with seed %d: Dst.", seed) } } } + +func TestEncryptWithZeroKey(t *testing.T) { + InitSingleTest() + + params, err := generateMessageParams() + if err != nil { + t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err) + } + + msg := NewSentMessage(params) + + params.KeySym = make([]byte, aesKeyLength) + _, err = msg.Wrap(params) + if err == nil { + t.Fatalf("wrapped with zero key, seed: %d.", seed) + } + + params.KeySym = make([]byte, 0) + _, err = msg.Wrap(params) + if err == nil { + t.Fatalf("wrapped with empty key, seed: %d.", seed) + } + + params.KeySym = nil + _, err = msg.Wrap(params) + if err == nil { + t.Fatalf("wrapped with nil key, seed: %d.", seed) + } +} diff --git a/whisper/whisperv5/peer_test.go b/whisper/whisperv5/peer_test.go index 082e7f446..88da59bff 100644 --- a/whisper/whisperv5/peer_test.go +++ b/whisper/whisperv5/peer_test.go @@ -118,6 +118,7 @@ func initialize(t *testing.T) { var node TestNode node.shh = NewWhisper(nil) node.shh.test = true + node.shh.Start(nil) topics := make([]TopicType, 0) topics = append(topics, sharedTopic) f := Filter{KeySym: sharedKey, Topics: topics} @@ -166,6 +167,7 @@ func stopServers() { n := nodes[i] if n != nil { n.shh.Unwatch(n.filerId) + n.shh.Stop() n.server.Stop() } } diff --git a/whisper/whisperv5/topic.go b/whisper/whisperv5/topic.go index c29c344be..54d7422d1 100644 --- a/whisper/whisperv5/topic.go +++ b/whisper/whisperv5/topic.go @@ -14,8 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Contains the Whisper protocol Topic element. For formal details please see -// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics. +// Contains the Whisper protocol Topic element. package whisperv5 diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index dc9571f6e..789adbdb3 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -22,6 +22,7 @@ import ( crand "crypto/rand" "crypto/sha256" "fmt" + "runtime" "sync" "time" @@ -45,7 +46,7 @@ type Whisper struct { symKeys map[string][]byte keyMu sync.RWMutex - envelopes map[common.Hash]*Envelope // Pool of messages currently tracked by this node + envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet expirations map[uint32]*set.SetNonTS // Message expiration pool poolMu sync.RWMutex // Mutex to sync the message and expiration pools @@ -55,22 +56,28 @@ type Whisper struct { mailServer MailServer - quit chan struct{} - test bool + messageQueue chan *Envelope + p2pMsgQueue chan *Envelope + quit chan struct{} + + overflow bool + test bool } // New creates a Whisper client ready to communicate through the Ethereum P2P network. // Param s should be passed if you want to implement mail server, otherwise nil. func NewWhisper(server MailServer) *Whisper { whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - messages: make(map[common.Hash]*ReceivedMessage), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - mailServer: server, - quit: make(chan struct{}), + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + messages: make(map[common.Hash]*ReceivedMessage), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + mailServer: server, + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), } whisper.filters = NewFilters(whisper) @@ -124,7 +131,7 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, data []byte) error { return err } p.trusted = true - return p2p.Send(p.ws, mailRequestCode, data) + return p2p.Send(p.ws, p2pRequestCode, data) } func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { @@ -270,6 +277,12 @@ func (w *Whisper) Send(envelope *Envelope) error { func (w *Whisper) Start(*p2p.Server) error { glog.V(logger.Info).Infoln("Whisper started") go w.update() + + numCPU := runtime.NumCPU() + for i := 0; i < numCPU; i++ { + go w.processQueue() + } + return nil } @@ -350,10 +363,10 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return fmt.Errorf("garbage received (directMessage)") } for _, envelope := range envelopes { - wh.postEvent(envelope, p2pCode) + wh.postEvent(envelope, true) } } - case mailRequestCode: + case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. if wh.mailServer != nil { s := rlp.NewStream(packet.Payload, uint64(packet.Size)) @@ -382,7 +395,7 @@ func (wh *Whisper) add(envelope *Envelope) error { if sent > now { if sent-SynchAllowance > now { - return fmt.Errorf("message created in the future") + return fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency envelope.calculatePoW(sent - now + 1) @@ -393,30 +406,31 @@ func (wh *Whisper) add(envelope *Envelope) error { if envelope.Expiry+SynchAllowance*2 < now { return fmt.Errorf("very old message") } else { + glog.V(logger.Debug).Infof("expired envelope dropped [%x]", envelope.Hash()) return nil // drop envelope without error } } if len(envelope.Data) > MaxMessageLength { - return fmt.Errorf("huge messages are not allowed") + return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } if len(envelope.Version) > 4 { - return fmt.Errorf("oversized Version") + return fmt.Errorf("oversized version [%x]", envelope.Hash()) } if len(envelope.AESNonce) > AESNonceMaxLength { // the standard AES GSM nonce size is 12, // but const gcmStandardNonceSize cannot be accessed directly - return fmt.Errorf("oversized AESNonce") + return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash()) } if len(envelope.Salt) > saltLength { - return fmt.Errorf("oversized Salt") + return fmt.Errorf("oversized salt [%x]", envelope.Hash()) } if envelope.PoW() < MinimumPoW && !wh.test { - glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f", envelope.PoW()) + glog.V(logger.Debug).Infof("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash()) return nil // drop envelope without error } @@ -436,22 +450,59 @@ func (wh *Whisper) add(envelope *Envelope) error { wh.poolMu.Unlock() if alreadyCached { - glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + glog.V(logger.Detail).Infof("whisper envelope already cached [%x]\n", envelope.Hash()) } else { - wh.postEvent(envelope, messagesCode) // notify the local node about the new message - glog.V(logger.Detail).Infof("cached whisper envelope %v\n", envelope) + glog.V(logger.Detail).Infof("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope) + wh.postEvent(envelope, false) // notify the local node about the new message } return nil } -// postEvent delivers the message to the watchers. -func (w *Whisper) postEvent(envelope *Envelope, messageCode uint64) { +// postEvent queues the message for further processing. +func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { // if the version of incoming message is higher than // currently supported version, we can not decrypt it, // and therefore just ignore this message if envelope.Ver() <= EnvelopeVersion { - // todo: review if you need an additional thread here - go w.filters.NotifyWatchers(envelope, messageCode) + if isP2P { + w.p2pMsgQueue <- envelope + } else { + w.checkOverflow() + w.messageQueue <- envelope + } + } +} + +// checkOverflow checks if message queue overflow occurs and reports it if necessary. +func (w *Whisper) checkOverflow() { + queueSize := len(w.messageQueue) + + if queueSize == messageQueueLimit { + if !w.overflow { + w.overflow = true + glog.V(logger.Warn).Infoln("message queue overflow") + } + } else if queueSize <= messageQueueLimit/2 { + if w.overflow { + w.overflow = false + } + } +} + +// processQueue delivers the messages to the watchers during the lifetime of the whisper node. +func (w *Whisper) processQueue() { + var e *Envelope + for { + select { + case <-w.quit: + return + + case e = <-w.messageQueue: + w.filters.NotifyWatchers(e, false) + + case e = <-w.p2pMsgQueue: + w.filters.NotifyWatchers(e, true) + } } } diff --git a/whisper/whisperv5/whisper_test.go b/whisper/whisperv5/whisper_test.go index 3f79a72c8..9af95f445 100644 --- a/whisper/whisperv5/whisper_test.go +++ b/whisper/whisperv5/whisper_test.go @@ -19,6 +19,7 @@ package whisperv5 import ( "bytes" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -309,3 +310,56 @@ func TestWhisperSymKeyManagement(t *testing.T) { t.Fatalf("failed to delete second key: second key is not nil.") } } + +func TestExpiry(t *testing.T) { + InitSingleTest() + + w := NewWhisper(nil) + w.test = true + w.Start(nil) + defer w.Stop() + + params, err := generateMessageParams() + if err != nil { + t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err) + } + + params.TTL = 1 + msg := NewSentMessage(params) + env, err := msg.Wrap(params) + if err != nil { + t.Fatalf("failed Wrap with seed %d: %s.", seed, err) + } + + err = w.Send(env) + if err != nil { + t.Fatalf("failed to send envelope with seed %d: %s.", seed, err) + } + + // wait till received or timeout + var received, expired bool + for j := 0; j < 20; j++ { + time.Sleep(100 * time.Millisecond) + if len(w.Envelopes()) > 0 { + received = true + break + } + } + + if !received { + t.Fatalf("did not receive the sent envelope, seed: %d.", seed) + } + + // wait till expired or timeout + for j := 0; j < 20; j++ { + time.Sleep(100 * time.Millisecond) + if len(w.Envelopes()) == 0 { + expired = true + break + } + } + + if !expired { + t.Fatalf("expire failed, seed: %d.", seed) + } +}