Merge pull request #718 from karalabe/whisper-cleanup

Whisper cleanup, part 2
This commit is contained in:
Felix Lange 2015-04-17 14:10:55 +02:00
commit 4020258801
16 changed files with 922 additions and 379 deletions

View File

@ -41,7 +41,7 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)), To: crypto.ToECDSAPub(common.FromHex(to)),
From: key, From: key,
Topics: whisper.TopicsFromString(topics...), Topics: whisper.NewTopicsFromStrings(topics...),
}) })
if err != nil { if err != nil {
@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
if topicList, ok := opts["topics"].(*qml.List); ok { if topicList, ok := opts["topics"].(*qml.List); ok {
var topics []string var topics []string
topicList.Convert(&topics) topicList.Convert(&topics)
f.Topics = whisper.TopicsFromString(topics...) f.Topics = whisper.NewTopicsFromStrings(topics...)
} }
return return

View File

@ -20,16 +20,16 @@ import (
type Envelope struct { type Envelope struct {
Expiry uint32 // Whisper protocol specifies int32, really should be int64 Expiry uint32 // Whisper protocol specifies int32, really should be int64
TTL uint32 // ^^^^^^ TTL uint32 // ^^^^^^
Topics [][]byte Topics []Topic
Data []byte Data []byte
Nonce uint32 Nonce uint32
hash common.Hash hash common.Hash // Cached hash of the envelope to avoid rehashing every time
} }
// NewEnvelope wraps a Whisper message with expiration and destination data // NewEnvelope wraps a Whisper message with expiration and destination data
// included into an envelope for network forwarding. // included into an envelope for network forwarding.
func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope { func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope {
return &Envelope{ return &Envelope{
Expiry: uint32(time.Now().Add(ttl).Unix()), Expiry: uint32(time.Now().Add(ttl).Unix()),
TTL: uint32(ttl.Seconds()), TTL: uint32(ttl.Seconds()),
@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) {
} }
} }
// valid checks whether the claimed proof of work was indeed executed.
// TODO: Is this really useful? Isn't this always true?
func (self *Envelope) valid() bool {
d := make([]byte, 64)
copy(d[:32], self.rlpWithoutNonce())
binary.BigEndian.PutUint32(d[60:], self.Nonce)
return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0
}
// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
func (self *Envelope) rlpWithoutNonce() []byte { func (self *Envelope) rlpWithoutNonce() []byte {
enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})
@ -85,20 +75,19 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
} }
data = data[1:] data = data[1:]
if message.Flags&128 == 128 { if message.Flags&signatureFlag == signatureFlag {
if len(data) < 65 { if len(data) < signatureLength {
return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65") return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)")
} }
message.Signature, data = data[:65], data[65:] message.Signature, data = data[:signatureLength], data[signatureLength:]
} }
message.Payload = data message.Payload = data
// Short circuit if the encryption was requested // Decrypt the message, if requested
if key == nil { if key == nil {
return message, nil return message, nil
} }
// Otherwise try to decrypt the message err = message.decrypt(key)
message.Payload, err = crypto.Decrypt(key, message.Payload)
switch err { switch err {
case nil: case nil:
return message, nil return message, nil

View File

@ -1,10 +1,13 @@
// Contains the message filter for fine grained subscriptions.
package whisper package whisper
import "crypto/ecdsa" import "crypto/ecdsa"
// Filter is used to subscribe to specific types of whisper messages.
type Filter struct { type Filter struct {
To *ecdsa.PublicKey To *ecdsa.PublicKey // Recipient of the message
From *ecdsa.PublicKey From *ecdsa.PublicKey // Sender of the message
Topics [][]byte Topics []Topic // Topics to watch messages on
Fn func(*Message) Fn func(*Message) // Handler in case of a match
} }

View File

@ -69,10 +69,10 @@ func selfSend(shh *whisper.Whisper, payload []byte) error {
}) })
// Wrap the payload and encrypt it // Wrap the payload and encrypt it
msg := whisper.NewMessage(payload) msg := whisper.NewMessage(payload)
envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{ envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{
From: id, From: id,
To: &id.PublicKey, To: &id.PublicKey,
TTL: whisper.DefaultTimeToLive, TTL: whisper.DefaultTTL,
}) })
if err != nil { if err != nil {
return fmt.Errorf("failed to seal message: %v", err) return fmt.Errorf("failed to seal message: %v", err)

View File

@ -30,13 +30,14 @@ type Options struct {
From *ecdsa.PrivateKey From *ecdsa.PrivateKey
To *ecdsa.PublicKey To *ecdsa.PublicKey
TTL time.Duration TTL time.Duration
Topics [][]byte Topics []Topic
} }
// NewMessage creates and initializes a non-signed, non-encrypted Whisper message. // NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewMessage(payload []byte) *Message { func NewMessage(payload []byte) *Message {
// Construct an initial flag set: bit #1 = 0 (no signature), rest random // Construct an initial flag set: no signature, rest random
flags := byte(rand.Intn(128)) flags := byte(rand.Intn(256))
flags &= ^signatureFlag
// Assemble and return the message // Assemble and return the message
return &Message{ return &Message{
@ -61,7 +62,7 @@ func NewMessage(payload []byte) *Message {
func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) { func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) {
// Use the default TTL if non was specified // Use the default TTL if non was specified
if options.TTL == 0 { if options.TTL == 0 {
options.TTL = DefaultTimeToLive options.TTL = DefaultTTL
} }
// Sign and encrypt the message if requested // Sign and encrypt the message if requested
if options.From != nil { if options.From != nil {
@ -84,7 +85,7 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error)
// sign calculates and sets the cryptographic signature for the message , also // sign calculates and sets the cryptographic signature for the message , also
// setting the sign flag. // setting the sign flag.
func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
self.Flags |= 1 << 7 self.Flags |= signatureFlag
self.Signature, err = crypto.Sign(self.hash(), key) self.Signature, err = crypto.Sign(self.hash(), key)
return return
} }
@ -93,6 +94,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
func (self *Message) Recover() *ecdsa.PublicKey { func (self *Message) Recover() *ecdsa.PublicKey {
defer func() { recover() }() // in case of invalid signature defer func() { recover() }() // in case of invalid signature
// Short circuit if no signature is present
if self.Signature == nil {
return nil
}
// Otherwise try and recover the signature
pub, err := crypto.SigToPub(self.hash(), self.Signature) pub, err := crypto.SigToPub(self.hash(), self.Signature)
if err != nil { if err != nil {
glog.V(logger.Error).Infof("Could not get public key from signature: %v", err) glog.V(logger.Error).Infof("Could not get public key from signature: %v", err)
@ -102,8 +108,14 @@ func (self *Message) Recover() *ecdsa.PublicKey {
} }
// encrypt encrypts a message payload with a public key. // encrypt encrypts a message payload with a public key.
func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) { func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) {
self.Payload, err = crypto.Encrypt(to, self.Payload) self.Payload, err = crypto.Encrypt(key, self.Payload)
return
}
// decrypt decrypts an encrypted payload with a private key.
func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) {
self.Payload, err = crypto.Decrypt(key, self.Payload)
return return
} }

View File

@ -13,11 +13,11 @@ func TestMessageSimpleWrap(t *testing.T) {
payload := []byte("hello world") payload := []byte("hello world")
msg := NewMessage(payload) msg := NewMessage(payload)
if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil { if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil {
t.Fatalf("failed to wrap message: %v", err) t.Fatalf("failed to wrap message: %v", err)
} }
if msg.Flags&128 != 0 { if msg.Flags&signatureFlag != 0 {
t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
} }
if len(msg.Signature) != 0 { if len(msg.Signature) != 0 {
t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature) t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature)
@ -36,13 +36,13 @@ func TestMessageCleartextSignRecover(t *testing.T) {
payload := []byte("hello world") payload := []byte("hello world")
msg := NewMessage(payload) msg := NewMessage(payload)
if _, err := msg.Wrap(DefaultProofOfWork, Options{ if _, err := msg.Wrap(DefaultPoW, Options{
From: key, From: key,
}); err != nil { }); err != nil {
t.Fatalf("failed to sign message: %v", err) t.Fatalf("failed to sign message: %v", err)
} }
if msg.Flags&128 != 128 { if msg.Flags&signatureFlag != signatureFlag {
t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
} }
if bytes.Compare(msg.Payload, payload) != 0 { if bytes.Compare(msg.Payload, payload) != 0 {
t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload) t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload)
@ -69,14 +69,14 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) {
payload := []byte("hello world") payload := []byte("hello world")
msg := NewMessage(payload) msg := NewMessage(payload)
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ envelope, err := msg.Wrap(DefaultPoW, Options{
To: &key.PublicKey, To: &key.PublicKey,
}) })
if err != nil { if err != nil {
t.Fatalf("failed to encrypt message: %v", err) t.Fatalf("failed to encrypt message: %v", err)
} }
if msg.Flags&128 != 0 { if msg.Flags&signatureFlag != 0 {
t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
} }
if len(msg.Signature) != 0 { if len(msg.Signature) != 0 {
t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature) t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature)
@ -104,15 +104,15 @@ func TestMessageFullCrypto(t *testing.T) {
payload := []byte("hello world") payload := []byte("hello world")
msg := NewMessage(payload) msg := NewMessage(payload)
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ envelope, err := msg.Wrap(DefaultPoW, Options{
From: fromKey, From: fromKey,
To: &toKey.PublicKey, To: &toKey.PublicKey,
}) })
if err != nil { if err != nil {
t.Fatalf("failed to encrypt message: %v", err) t.Fatalf("failed to encrypt message: %v", err)
} }
if msg.Flags&128 != 128 { if msg.Flags&signatureFlag != signatureFlag {
t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
} }
if len(msg.Signature) == 0 { if len(msg.Signature) == 0 {
t.Fatalf("no signature found for signed message") t.Fatalf("no signature found for signed message")

View File

@ -4,110 +4,160 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
const ( // peer represents a whisper protocol peer connection.
protocolVersion uint64 = 0x02
)
type peer struct { type peer struct {
host *Whisper host *Whisper
peer *p2p.Peer peer *p2p.Peer
ws p2p.MsgReadWriter ws p2p.MsgReadWriter
// XXX Eventually this is going to reach exceptional large space. We need an expiry here known *set.Set // Messages already known by the peer to avoid wasting bandwidth
known *set.Set
quit chan struct{} quit chan struct{}
} }
func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { // newPeer creates and initializes a new whisper peer connection, returning either
return &peer{host, p, ws, set.New(), make(chan struct{})} // the newly constructed link or a failure reason.
} func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) {
p := &peer{
func (self *peer) init() error { host: host,
if err := self.handleStatus(); err != nil { peer: remote,
return err ws: rw,
known: set.New(),
quit: make(chan struct{}),
} }
if err := p.handshake(); err != nil {
return nil return nil, err
}
return p, nil
} }
// start initiates the peer updater, periodically broadcasting the whisper packets
// into the network.
func (self *peer) start() { func (self *peer) start() {
go self.update() go self.update()
self.peer.Debugln("whisper started") self.peer.Debugln("whisper started")
} }
// stop terminates the peer updater, stopping message forwarding to it.
func (self *peer) stop() { func (self *peer) stop() {
self.peer.Debugln("whisper stopped")
close(self.quit) close(self.quit)
self.peer.Debugln("whisper stopped")
} }
func (self *peer) update() { // handshake sends the protocol initiation status message to the remote peer and
relay := time.NewTicker(300 * time.Millisecond) // verifies the remote status too.
out: func (self *peer) handshake() error {
for { // Send the handshake status message asynchronously
select { errc := make(chan error, 1)
case <-relay.C: go func() {
err := self.broadcast(self.host.envelopes()) errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
if err != nil { }()
self.peer.Infoln("broadcast err:", err) // Fetch the remote status packet and verify protocol match
break out packet, err := self.ws.ReadMsg()
} if err != nil {
return err
case <-self.quit:
break out
}
} }
} if packet.Code != statusCode {
return fmt.Errorf("peer sent %x before status packet", packet.Code)
func (self *peer) broadcast(envelopes []*Envelope) error {
envs := make([]*Envelope, 0, len(envelopes))
for _, env := range envelopes {
if !self.known.Has(env.Hash()) {
envs = append(envs, env)
self.known.Add(env.Hash())
}
} }
if len(envs) > 0 { s := rlp.NewStream(packet.Payload)
if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { if _, err := s.List(); err != nil {
return err return fmt.Errorf("bad status message: %v", err)
} }
self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("bad status message: %v", err)
}
if peerVersion != protocolVersion {
return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
}
// Wait until out own status is consumed too
if err := <-errc; err != nil {
return fmt.Errorf("failed to send status packet: %v", err)
} }
return nil return nil
} }
func (self *peer) addKnown(envelope *Envelope) { // update executes periodic operations on the peer, including message transmission
// and expiration.
func (self *peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
// Loop and transmit until termination is requested
for {
select {
case <-expire.C:
self.expire()
case <-transmit.C:
if err := self.broadcast(); err != nil {
self.peer.Infoln("broadcast failed:", err)
return
}
case <-self.quit:
return
}
}
}
// mark marks an envelope known to the peer so that it won't be sent back.
func (self *peer) mark(envelope *Envelope) {
self.known.Add(envelope.Hash()) self.known.Add(envelope.Hash())
} }
func (self *peer) handleStatus() error { // marked checks if an envelope is already known to the remote peer.
ws := self.ws func (self *peer) marked(envelope *Envelope) bool {
if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { return self.known.Has(envelope.Hash())
return err }
}
msg, err := ws.ReadMsg() // expire iterates over all the known envelopes in the host and removes all
if err != nil { // expired (unknown) ones from the known list.
return err func (self *peer) expire() {
} // Assemble the list of available envelopes
if msg.Code != statusMsg { available := set.NewNonTS()
return fmt.Errorf("peer send %x before status msg", msg.Code) for _, envelope := range self.host.envelopes() {
} available.Add(envelope.Hash())
s := rlp.NewStream(msg.Payload) }
if _, err := s.List(); err != nil { // Cross reference availability with known status
return fmt.Errorf("bad status message: %v", err) unmark := make(map[common.Hash]struct{})
} self.known.Each(func(v interface{}) bool {
pv, err := s.Uint() if !available.Has(v.(common.Hash)) {
if err != nil { unmark[v.(common.Hash)] = struct{}{}
return fmt.Errorf("bad status message: %v", err) }
} return true
if pv != protocolVersion { })
return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) // Dump all known but unavailable
} for hash, _ := range unmark {
return msg.Discard() // ignore anything after protocol version self.known.Remove(hash)
}
}
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (self *peer) broadcast() error {
// Fetch the envelopes and collect the unknown ones
envelopes := self.host.envelopes()
transmit := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !self.marked(envelope) {
transmit = append(transmit, envelope)
self.mark(envelope)
}
}
// Transmit the unknown batch (potentially empty)
if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
return err
}
self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)")
return nil
} }

242
whisper/peer_test.go Normal file
View File

@ -0,0 +1,242 @@
package whisper
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
)
type testPeer struct {
client *Whisper
stream *p2p.MsgPipeRW
termed chan struct{}
}
func startTestPeer() *testPeer {
// Create a simulated P2P remote peer and data streams to it
remote := p2p.NewPeer(discover.NodeID{}, "", nil)
tester, tested := p2p.MsgPipe()
// Create a whisper client and connect with it to the tester peer
client := New()
client.Start()
termed := make(chan struct{})
go func() {
defer client.Stop()
defer close(termed)
defer tested.Close()
client.handlePeer(remote, tested)
}()
return &testPeer{
client: client,
stream: tester,
termed: termed,
}
}
func startTestPeerInited() (*testPeer, error) {
peer := startTestPeer()
if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
peer.stream.Close()
return nil, err
}
if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
peer.stream.Close()
return nil, err
}
return peer, nil
}
func TestPeerStatusMessage(t *testing.T) {
tester := startTestPeer()
// Wait for the handshake status message and check it
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
t.Fatalf("status message mismatch: %v", err)
}
// Terminate the node
tester.stream.Close()
select {
case <-tester.termed:
case <-time.After(time.Second):
t.Fatalf("local close timed out")
}
}
func TestPeerHandshakeFail(t *testing.T) {
tester := startTestPeer()
// Wait for and check the handshake
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
t.Fatalf("status message mismatch: %v", err)
}
// Send an invalid handshake status and verify disconnect
if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
t.Fatalf("failed to send malformed status: %v", err)
}
select {
case <-tester.termed:
case <-time.After(time.Second):
t.Fatalf("remote close timed out")
}
}
func TestPeerHandshakeSuccess(t *testing.T) {
tester := startTestPeer()
// Wait for and check the handshake
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
t.Fatalf("status message mismatch: %v", err)
}
// Send a valid handshake status and make sure connection stays live
if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
t.Fatalf("failed to send status: %v", err)
}
select {
case <-tester.termed:
t.Fatalf("valid handshake disconnected")
case <-time.After(100 * time.Millisecond):
}
// Clean up the test
tester.stream.Close()
select {
case <-tester.termed:
case <-time.After(time.Second):
t.Fatalf("local close timed out")
}
}
func TestPeerSend(t *testing.T) {
// Start a tester and execute the handshake
tester, err := startTestPeerInited()
if err != nil {
t.Fatalf("failed to start initialized peer: %v", err)
}
defer tester.stream.Close()
// Construct a message and inject into the tester
message := NewMessage([]byte("peer broadcast test message"))
envelope, err := message.Wrap(DefaultPoW, Options{
TTL: DefaultTTL,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := tester.client.Send(envelope); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Check that the message is eventually forwarded
payload := []interface{}{envelope}
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
t.Fatalf("message mismatch: %v", err)
}
// Make sure that even with a re-insert, an empty batch is received
if err := tester.client.Send(envelope); err != nil {
t.Fatalf("failed to send message: %v", err)
}
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
t.Fatalf("message mismatch: %v", err)
}
}
func TestPeerDeliver(t *testing.T) {
// Start a tester and execute the handshake
tester, err := startTestPeerInited()
if err != nil {
t.Fatalf("failed to start initialized peer: %v", err)
}
defer tester.stream.Close()
// Watch for all inbound messages
arrived := make(chan struct{}, 1)
tester.client.Watch(Filter{
Fn: func(message *Message) {
arrived <- struct{}{}
},
})
// Construct a message and deliver it to the tester peer
message := NewMessage([]byte("peer broadcast test message"))
envelope, err := message.Wrap(DefaultPoW, Options{
TTL: DefaultTTL,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
t.Fatalf("failed to transfer message: %v", err)
}
// Check that the message is delivered upstream
select {
case <-arrived:
case <-time.After(time.Second):
t.Fatalf("message delivery timeout")
}
// Check that a resend is not delivered
if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
t.Fatalf("failed to transfer message: %v", err)
}
select {
case <-time.After(2 * transmissionCycle):
case <-arrived:
t.Fatalf("repeating message arrived")
}
}
func TestPeerMessageExpiration(t *testing.T) {
// Start a tester and execute the handshake
tester, err := startTestPeerInited()
if err != nil {
t.Fatalf("failed to start initialized peer: %v", err)
}
defer tester.stream.Close()
// Fetch the peer instance for later inspection
tester.client.peerMu.RLock()
if peers := len(tester.client.peers); peers != 1 {
t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
}
var peer *peer
for peer, _ = range tester.client.peers {
break
}
tester.client.peerMu.RUnlock()
// Construct a message and pass it through the tester
message := NewMessage([]byte("peer test message"))
envelope, err := message.Wrap(DefaultPoW, Options{
TTL: time.Second,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := tester.client.Send(envelope); err != nil {
t.Fatalf("failed to send message: %v", err)
}
payload := []interface{}{envelope}
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
t.Fatalf("message mismatch: %v", err)
}
// Check that the message is inside the cache
if !peer.known.Has(envelope.Hash()) {
t.Fatalf("message not found in cache")
}
// Discard messages until expiration and check cache again
exp := time.Now().Add(time.Second + expirationCycle)
for time.Now().Before(exp) {
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
t.Fatalf("message mismatch: %v", err)
}
}
if peer.known.Has(envelope.Hash()) {
t.Fatalf("message not expired from cache")
}
}

View File

@ -1,29 +0,0 @@
package whisper
import (
"sort"
"github.com/ethereum/go-ethereum/common"
)
type sortedKeys struct {
k []int32
}
func (self *sortedKeys) Len() int { return len(self.k) }
func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] }
func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] }
func sortKeys(m map[int32]common.Hash) []int32 {
sorted := new(sortedKeys)
sorted.k = make([]int32, len(m))
i := 0
for key, _ := range m {
sorted.k[i] = key
i++
}
sort.Sort(sorted)
return sorted.k
}

View File

@ -1,23 +0,0 @@
package whisper
import (
"testing"
"github.com/ethereum/go-ethereum/common"
)
func TestSorting(t *testing.T) {
m := map[int32]common.Hash{
1: {1},
3: {3},
2: {2},
5: {5},
}
exp := []int32{1, 2, 3, 5}
res := sortKeys(m)
for i, k := range res {
if k != exp[i] {
t.Error(k, "failed. Expected", exp[i])
}
}
}

61
whisper/topic.go Normal file
View File

@ -0,0 +1,61 @@
// Contains the Whisper protocol Topic element. For formal details please see
// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
package whisper
import "github.com/ethereum/go-ethereum/crypto"
// Topic represents a cryptographically secure, probabilistic partial
// classifications of a message, determined as the first (left) 4 bytes of the
// SHA3 hash of some arbitrary data given by the original author of the message.
type Topic [4]byte
// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data.
func NewTopic(data []byte) Topic {
prefix := [4]byte{}
copy(prefix[:], crypto.Sha3(data)[:4])
return Topic(prefix)
}
// NewTopics creates a list of topics from a list of binary data elements, by
// iteratively calling NewTopic on each of them.
func NewTopics(data ...[]byte) []Topic {
topics := make([]Topic, len(data))
for i, element := range data {
topics[i] = NewTopic(element)
}
return topics
}
// NewTopicFromString creates a topic using the binary data contents of the
// specified string.
func NewTopicFromString(data string) Topic {
return NewTopic([]byte(data))
}
// NewTopicsFromStrings creates a list of topics from a list of textual data
// elements, by iteratively calling NewTopicFromString on each of them.
func NewTopicsFromStrings(data ...string) []Topic {
topics := make([]Topic, len(data))
for i, element := range data {
topics[i] = NewTopicFromString(element)
}
return topics
}
// String converts a topic byte array to a string representation.
func (self *Topic) String() string {
return string(self[:])
}
// TopicSet represents a hash set to check if a topic exists or not.
type topicSet map[string]struct{}
// NewTopicSet creates a topic hash set from a slice of topics.
func newTopicSet(topics []Topic) topicSet {
set := make(map[string]struct{})
for _, topic := range topics {
set[topic.String()] = struct{}{}
}
return topicSet(set)
}

67
whisper/topic_test.go Normal file
View File

@ -0,0 +1,67 @@
package whisper
import (
"bytes"
"testing"
)
var topicCreationTests = []struct {
data []byte
hash [4]byte
}{
{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil},
{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}},
{hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")},
}
func TestTopicCreation(t *testing.T) {
// Create the topics individually
for i, tt := range topicCreationTests {
topic := NewTopic(tt.data)
if bytes.Compare(topic[:], tt.hash[:]) != 0 {
t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
}
}
for i, tt := range topicCreationTests {
topic := NewTopicFromString(string(tt.data))
if bytes.Compare(topic[:], tt.hash[:]) != 0 {
t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
}
}
// Create the topics in batches
binaryData := make([][]byte, len(topicCreationTests))
for i, tt := range topicCreationTests {
binaryData[i] = tt.data
}
textualData := make([]string, len(topicCreationTests))
for i, tt := range topicCreationTests {
textualData[i] = string(tt.data)
}
topics := NewTopics(binaryData...)
for i, tt := range topicCreationTests {
if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
}
}
topics = NewTopicsFromStrings(textualData...)
for i, tt := range topicCreationTests {
if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
}
}
}
func TestTopicSetCreation(t *testing.T) {
topics := make([]Topic, len(topicCreationTests))
for i, tt := range topicCreationTests {
topics[i] = NewTopic(tt.data)
}
set := newTopicSet(topics)
for i, tt := range topicCreationTests {
topic := NewTopic(tt.data)
if _, ok := set[topic.String()]; !ok {
t.Errorf("topic %d: not found in set", i)
}
}
}

View File

@ -1,36 +0,0 @@
package whisper
import "github.com/ethereum/go-ethereum/crypto"
func hashTopic(topic []byte) []byte {
return crypto.Sha3(topic)[:4]
}
// NOTE this isn't DRY, but I don't want to iterate twice.
// Returns a formatted topics byte slice.
// data: unformatted data (e.g., no hashes needed)
func Topics(data [][]byte) [][]byte {
d := make([][]byte, len(data))
for i, byts := range data {
d[i] = hashTopic(byts)
}
return d
}
func TopicsFromString(data ...string) [][]byte {
d := make([][]byte, len(data))
for i, str := range data {
d[i] = hashTopic([]byte(str))
}
return d
}
func bytesToMap(s [][]byte) map[string]struct{} {
m := make(map[string]struct{})
for _, topic := range s {
m[string(topic)] = struct{}{}
}
return m
}

View File

@ -2,7 +2,6 @@ package whisper
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"errors"
"sync" "sync"
"time" "time"
@ -17,9 +16,22 @@ import (
) )
const ( const (
statusMsg = 0x0 statusCode = 0x00
envelopesMsg = 0x01 messagesCode = 0x01
whisperVersion = 0x02
protocolVersion uint64 = 0x02
protocolName = "shh"
signatureFlag = byte(1 << 7)
signatureLength = 65
expirationCycle = 800 * time.Millisecond
transmissionCycle = 300 * time.Millisecond
)
const (
DefaultTTL = 50 * time.Second
DefaultPoW = 50 * time.Millisecond
) )
type MessageEvent struct { type MessageEvent struct {
@ -28,49 +40,104 @@ type MessageEvent struct {
Message *Message Message *Message
} }
const ( // Whisper represents a dark communication interface through the Ethereum
DefaultTimeToLive = 50 * time.Second // network, using its very own P2P communication layer.
DefaultProofOfWork = 50 * time.Millisecond
)
type Whisper struct { type Whisper struct {
protocol p2p.Protocol protocol p2p.Protocol
filters *filter.Filters filters *filter.Filters
mmu sync.RWMutex keys map[string]*ecdsa.PrivateKey
messages map[common.Hash]*Envelope
expiry map[uint32]*set.SetNonTS messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node
expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter)
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
peers map[*peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
quit chan struct{} quit chan struct{}
keys map[string]*ecdsa.PrivateKey
} }
func New() *Whisper { func New() *Whisper {
whisper := &Whisper{ whisper := &Whisper{
messages: make(map[common.Hash]*Envelope), filters: filter.New(),
filters: filter.New(), keys: make(map[string]*ecdsa.PrivateKey),
expiry: make(map[uint32]*set.SetNonTS), messages: make(map[common.Hash]*Envelope),
quit: make(chan struct{}), expirations: make(map[uint32]*set.SetNonTS),
keys: make(map[string]*ecdsa.PrivateKey), peers: make(map[*peer]struct{}),
quit: make(chan struct{}),
} }
whisper.filters.Start() whisper.filters.Start()
// p2p whisper sub protocol handler // p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{ whisper.protocol = p2p.Protocol{
Name: "shh", Name: protocolName,
Version: uint(whisperVersion), Version: uint(protocolVersion),
Length: 2, Length: 2,
Run: whisper.msgHandler, Run: whisper.handlePeer,
} }
return whisper return whisper
} }
// Protocol returns the whisper sub-protocol handler for this particular client.
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
}
// Version returns the whisper sub-protocols version number.
func (self *Whisper) Version() uint { func (self *Whisper) Version() uint {
return self.protocol.Version return self.protocol.Version
} }
// NewIdentity generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption.
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
return key
}
// HasIdentity checks if the the whisper node is configured with the private key
// of the specified public pair.
func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
return self.keys[string(crypto.FromECDSAPub(key))] != nil
}
// GetIdentity retrieves the private key of the specified public identity.
func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
return self.keys[string(crypto.FromECDSAPub(key))]
}
// Watch installs a new message handler to run in case a matching packet arrives
// from the whisper network.
func (self *Whisper) Watch(options Filter) int {
filter := filter.Generic{
Str1: string(crypto.FromECDSAPub(options.To)),
Str2: string(crypto.FromECDSAPub(options.From)),
Data: newTopicSet(options.Topics),
Fn: func(data interface{}) {
options.Fn(data.(*Message))
},
}
return self.filters.Install(filter)
}
// Unwatch removes an installed message handler.
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (self *Whisper) Send(envelope *Envelope) error {
return self.add(envelope)
}
func (self *Whisper) Start() { func (self *Whisper) Start() {
glog.V(logger.Info).Infoln("Whisper started") glog.V(logger.Info).Infoln("Whisper started")
go self.update() go self.update()
@ -78,29 +145,22 @@ func (self *Whisper) Start() {
func (self *Whisper) Stop() { func (self *Whisper) Stop() {
close(self.quit) close(self.quit)
glog.V(logger.Info).Infoln("Whisper stopped")
} }
func (self *Whisper) Send(envelope *Envelope) error { // Messages retrieves the currently pooled messages matching a filter id.
return self.add(envelope) func (self *Whisper) Messages(id int) []*Message {
} messages := make([]*Message, 0)
if filter := self.filters.Get(id); filter != nil {
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { for _, envelope := range self.messages {
key, err := crypto.GenerateKey() if message := self.open(envelope); message != nil {
if err != nil { if self.filters.Match(filter, createFilter(message, envelope.Topics)) {
panic(err) messages = append(messages, message)
}
}
}
} }
return messages
self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
return key
}
func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
return self.keys[string(crypto.FromECDSAPub(key))] != nil
}
func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
return self.keys[string(crypto.FromECDSAPub(key))]
} }
// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
@ -112,166 +172,166 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
// return false // return false
// } // }
func (self *Whisper) Watch(opts Filter) int { // handlePeer is called by the underlying P2P layer when the whisper sub-protocol
return self.filters.Install(filter.Generic{ // connection is negotiated.
Str1: string(crypto.FromECDSAPub(opts.To)), func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
Str2: string(crypto.FromECDSAPub(opts.From)), // Create, initialize and start the whisper peer
Data: bytesToMap(opts.Topics), whisperPeer, err := newPeer(self, peer, rw)
Fn: func(data interface{}) { if err != nil {
opts.Fn(data.(*Message))
},
})
}
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
func (self *Whisper) Messages(id int) (messages []*Message) {
filter := self.filters.Get(id)
if filter != nil {
for _, e := range self.messages {
if msg, key := self.open(e); msg != nil {
f := createFilter(msg, e.Topics, key)
if self.filters.Match(filter, f) {
messages = append(messages, msg)
}
}
}
}
return
}
// Main handler for passing whisper messages to whisper peer objects
func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
wpeer := NewPeer(self, peer, ws)
// initialise whisper peer (handshake/status)
if err := wpeer.init(); err != nil {
return err return err
} }
// kick of the main handler for broadcasting/managing envelopes whisperPeer.start()
go wpeer.start() defer whisperPeer.stop()
defer wpeer.stop()
// Main *read* loop. Writing is done by the peer it self. // Start tracking the active peer
self.peerMu.Lock()
self.peers[whisperPeer] = struct{}{}
self.peerMu.Unlock()
defer func() {
self.peerMu.Lock()
delete(self.peers, whisperPeer)
self.peerMu.Unlock()
}()
// Read and process inbound messages directly to merge into client-global state
for { for {
msg, err := ws.ReadMsg() // Fetch the next packet and decode the contained envelopes
packet, err := rw.ReadMsg()
if err != nil { if err != nil {
return err return err
} }
var envelopes []*Envelope var envelopes []*Envelope
if err := msg.Decode(&envelopes); err != nil { if err := packet.Decode(&envelopes); err != nil {
peer.Infoln(err) peer.Infof("failed to decode enveloped: %v", err)
continue continue
} }
// Inject all envelopes into the internal pool
for _, envelope := range envelopes { for _, envelope := range envelopes {
if err := self.add(envelope); err != nil { if err := self.add(envelope); err != nil {
// TODO Punish peer here. Invalid envelope. // TODO Punish peer here. Invalid envelope.
peer.Debugln(err) peer.Debugf("failed to pool envelope: %f", err)
} }
wpeer.addKnown(envelope) whisperPeer.mark(envelope)
} }
} }
} }
// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. // add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp.
func (self *Whisper) add(envelope *Envelope) error { func (self *Whisper) add(envelope *Envelope) error {
if !envelope.valid() { self.poolMu.Lock()
return errors.New("invalid pow provided for envelope") defer self.poolMu.Unlock()
}
self.mmu.Lock()
defer self.mmu.Unlock()
// Insert the message into the tracked pool
hash := envelope.Hash() hash := envelope.Hash()
self.messages[hash] = envelope if _, ok := self.messages[hash]; ok {
if self.expiry[envelope.Expiry] == nil { glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
self.expiry[envelope.Expiry] = set.NewNonTS() return nil
} }
self.messages[hash] = envelope
if !self.expiry[envelope.Expiry].Has(hash) { // Insert the message into the expiration pool for later removal
self.expiry[envelope.Expiry].Add(hash) if self.expirations[envelope.Expiry] == nil {
self.expirations[envelope.Expiry] = set.NewNonTS()
}
if !self.expirations[envelope.Expiry].Has(hash) {
self.expirations[envelope.Expiry].Add(hash)
// Notify the local node of a message arrival
go self.postEvent(envelope) go self.postEvent(envelope)
} }
glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope)
glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope)
return nil return nil
} }
// postEvent opens an envelope with the configured identities and delivers the
// message upstream from application processing.
func (self *Whisper) postEvent(envelope *Envelope) {
if message := self.open(envelope); message != nil {
self.filters.Notify(createFilter(message, envelope.Topics), message)
}
}
// open tries to decrypt a whisper envelope with all the configured identities,
// returning the decrypted message and the key used to achieve it. If not keys
// are configured, open will return the payload as if non encrypted.
func (self *Whisper) open(envelope *Envelope) *Message {
// Short circuit if no identity is set, and assume clear-text
if len(self.keys) == 0 {
if message, err := envelope.Open(nil); err == nil {
return message
}
}
// Iterate over the keys and try to decrypt the message
for _, key := range self.keys {
message, err := envelope.Open(key)
if err == nil || err == ecies.ErrInvalidPublicKey {
message.To = &key.PublicKey
return message
}
}
// Failed to decrypt, don't return anything
return nil
}
// createFilter creates a message filter to check against installed handlers.
func createFilter(message *Message, topics []Topic) filter.Filter {
return filter.Generic{
Str1: string(crypto.FromECDSAPub(message.To)),
Str2: string(crypto.FromECDSAPub(message.Recover())),
Data: newTopicSet(topics),
}
}
// update loops until the lifetime of the whisper node, updating its internal
// state by expiring stale messages from the pool.
func (self *Whisper) update() { func (self *Whisper) update() {
expire := time.NewTicker(800 * time.Millisecond) // Start a ticker to check for expirations
out: expire := time.NewTicker(expirationCycle)
// Repeat updates until termination is requested
for { for {
select { select {
case <-expire.C: case <-expire.C:
self.expire() self.expire()
case <-self.quit: case <-self.quit:
break out return
} }
} }
} }
// expire iterates over all the expiration timestamps, removing all stale
// messages from the pools.
func (self *Whisper) expire() { func (self *Whisper) expire() {
self.mmu.Lock() self.poolMu.Lock()
defer self.mmu.Unlock() defer self.poolMu.Unlock()
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
for then, hashSet := range self.expiry { for then, hashSet := range self.expirations {
// Short circuit if a future time
if then > now { if then > now {
continue continue
} }
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool { hashSet.Each(func(v interface{}) bool {
delete(self.messages, v.(common.Hash)) delete(self.messages, v.(common.Hash))
return true return true
}) })
self.expiry[then].Clear() self.expirations[then].Clear()
} }
} }
func (self *Whisper) envelopes() (envelopes []*Envelope) { // envelopes retrieves all the messages currently pooled by the node.
self.mmu.RLock() func (self *Whisper) envelopes() []*Envelope {
defer self.mmu.RUnlock() self.poolMu.RLock()
defer self.poolMu.RUnlock()
envelopes = make([]*Envelope, len(self.messages)) envelopes := make([]*Envelope, 0, len(self.messages))
i := 0
for _, envelope := range self.messages { for _, envelope := range self.messages {
envelopes[i] = envelope envelopes = append(envelopes, envelope)
i++
}
return
}
func (self *Whisper) postEvent(envelope *Envelope) {
if message, key := self.open(envelope); message != nil {
self.filters.Notify(createFilter(message, envelope.Topics, key), message)
}
}
func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) {
for _, key := range self.keys {
if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) {
message.To = &key.PublicKey
return message, key
}
}
return nil, nil
}
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
}
func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter {
return filter.Generic{
Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())),
Data: bytesToMap(topics),
} }
return envelopes
} }

View File

@ -1,38 +1,185 @@
package whisper package whisper
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
) )
func TestEvent(t *testing.T) { func startTestCluster(n int) []*Whisper {
res := make(chan *Message, 1) // Create the batch of simulated peers
whisper := New() nodes := make([]*p2p.Peer, n)
id := whisper.NewIdentity() for i := 0; i < n; i++ {
whisper.Watch(Filter{ nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil)
To: &id.PublicKey, }
whispers := make([]*Whisper, n)
for i := 0; i < n; i++ {
whispers[i] = New()
whispers[i].Start()
}
// Wire all the peers to the root one
for i := 1; i < n; i++ {
src, dst := p2p.MsgPipe()
go whispers[0].handlePeer(nodes[i], src)
go whispers[i].handlePeer(nodes[0], dst)
}
return whispers
}
func TestSelfMessage(t *testing.T) {
// Start the single node cluster
client := startTestCluster(1)[0]
// Start watching for self messages, signal any arrivals
self := client.NewIdentity()
done := make(chan struct{})
client.Watch(Filter{
To: &self.PublicKey,
Fn: func(msg *Message) { Fn: func(msg *Message) {
res <- msg close(done)
}, },
}) })
// Send a dummy message to oneself
msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) msg := NewMessage([]byte("self whisper"))
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ envelope, err := msg.Wrap(DefaultPoW, Options{
TTL: DefaultTimeToLive, From: self,
From: id, To: &self.PublicKey,
To: &id.PublicKey, TTL: DefaultTTL,
}) })
if err != nil { if err != nil {
fmt.Println(err) t.Fatalf("failed to wrap message: %v", err)
t.FailNow() }
// Dump the message into the system and wait for it to pop back out
if err := client.Send(envelope); err != nil {
t.Fatalf("failed to send self-message: %v", err)
} }
tick := time.NewTicker(time.Second)
whisper.postEvent(envelope)
select { select {
case <-res: case <-done:
case <-tick.C: case <-time.After(time.Second):
t.Error("did not receive message") t.Fatalf("self-message receive timeout")
}
}
func TestDirectMessage(t *testing.T) {
// Start the sender-recipient cluster
cluster := startTestCluster(2)
sender := cluster[0]
senderId := sender.NewIdentity()
recipient := cluster[1]
recipientId := recipient.NewIdentity()
// Watch for arriving messages on the recipient
done := make(chan struct{})
recipient.Watch(Filter{
To: &recipientId.PublicKey,
Fn: func(msg *Message) {
close(done)
},
})
// Send a dummy message from the sender
msg := NewMessage([]byte("direct whisper"))
envelope, err := msg.Wrap(DefaultPoW, Options{
From: senderId,
To: &recipientId.PublicKey,
TTL: DefaultTTL,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := sender.Send(envelope); err != nil {
t.Fatalf("failed to send direct message: %v", err)
}
// Wait for an arrival or a timeout
select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("direct message receive timeout")
}
}
func TestAnonymousBroadcast(t *testing.T) {
testBroadcast(true, t)
}
func TestIdentifiedBroadcast(t *testing.T) {
testBroadcast(false, t)
}
func testBroadcast(anonymous bool, t *testing.T) {
// Start the single sender multi recipient cluster
cluster := startTestCluster(3)
sender := cluster[1]
targets := cluster[1:]
for _, target := range targets {
if !anonymous {
target.NewIdentity()
}
}
// Watch for arriving messages on the recipients
dones := make([]chan struct{}, len(targets))
for i := 0; i < len(targets); i++ {
done := make(chan struct{}) // need for the closure
dones[i] = done
targets[i].Watch(Filter{
Topics: NewTopicsFromStrings("broadcast topic"),
Fn: func(msg *Message) {
close(done)
},
})
}
// Send a dummy message from the sender
msg := NewMessage([]byte("broadcast whisper"))
envelope, err := msg.Wrap(DefaultPoW, Options{
Topics: NewTopicsFromStrings("broadcast topic"),
TTL: DefaultTTL,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := sender.Send(envelope); err != nil {
t.Fatalf("failed to send broadcast message: %v", err)
}
// Wait for an arrival on each recipient, or timeouts
timeout := time.After(time.Second)
for _, done := range dones {
select {
case <-done:
case <-timeout:
t.Fatalf("broadcast message receive timeout")
}
}
}
func TestMessageExpiration(t *testing.T) {
// Start the single node cluster and inject a dummy message
node := startTestCluster(1)[0]
message := NewMessage([]byte("expiring message"))
envelope, err := message.Wrap(DefaultPoW, Options{
TTL: time.Second,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := node.Send(envelope); err != nil {
t.Fatalf("failed to inject message: %v", err)
}
// Check that the message is inside the cache
if _, ok := node.messages[envelope.Hash()]; !ok {
t.Fatalf("message not found in cache")
}
// Wait for expiration and check cache again
time.Sleep(time.Second) // wait for expiration
time.Sleep(expirationCycle) // wait for cleanup cycle
if _, ok := node.messages[envelope.Hash()]; ok {
t.Fatalf("message not expired from cache")
} }
} }

View File

@ -36,7 +36,7 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio
TTL: time.Duration(ttl) * time.Second, TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)), To: crypto.ToECDSAPub(common.FromHex(to)),
From: key, From: key,
Topics: whisper.TopicsFromString(topics...), Topics: whisper.NewTopicsFromStrings(topics...),
}) })
if err != nil { if err != nil {
@ -71,7 +71,7 @@ func (self *Whisper) Watch(opts *Options) int {
filter := whisper.Filter{ filter := whisper.Filter{
To: crypto.ToECDSAPub(common.FromHex(opts.To)), To: crypto.ToECDSAPub(common.FromHex(opts.To)),
From: crypto.ToECDSAPub(common.FromHex(opts.From)), From: crypto.ToECDSAPub(common.FromHex(opts.From)),
Topics: whisper.TopicsFromString(opts.Topics...), Topics: whisper.NewTopicsFromStrings(opts.Topics...),
} }
var i int var i int