// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package pss import ( "bytes" "context" "crypto/ecdsa" "crypto/rand" "errors" "fmt" "hash" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/storage" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" "golang.org/x/crypto/sha3" ) const ( defaultPaddingByteSize = 16 DefaultMsgTTL = time.Second * 120 defaultDigestCacheTTL = time.Second * 10 defaultSymKeyCacheCapacity = 512 digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash) defaultWhisperWorkTime = 3 defaultWhisperPoW = 0.0000000001 defaultMaxMsgSize = 1024 * 1024 defaultCleanInterval = time.Second * 60 * 10 defaultOutboxCapacity = 100000 pssProtocolName = "pss" pssVersion = 2 hasherCount = 8 ) var ( addressLength = len(pot.Address{}) ) // cache is used for preventing backwards routing // will also be instrumental in flood guard mechanism // and mailbox implementation type pssCacheEntry struct { expiresAt time.Time } // abstraction to enable access to p2p.protocols.Peer.Send type senderPeer interface { Info() *p2p.PeerInfo ID() enode.ID Address() []byte Send(context.Context, interface{}) error } // per-key peer related information // member `protected` prevents garbage collection of the instance type pssPeer struct { lastSeen time.Time address PssAddress protected bool } // Pss configuration parameters type PssParams struct { MsgTTL time.Duration CacheTTL time.Duration privateKey *ecdsa.PrivateKey SymKeyCacheCapacity int AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption } // Sane defaults for Pss func NewPssParams() *PssParams { return &PssParams{ MsgTTL: DefaultMsgTTL, CacheTTL: defaultDigestCacheTTL, SymKeyCacheCapacity: defaultSymKeyCacheCapacity, } } func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams { params.privateKey = privatekey return params } // Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding. // // Implements node.Service type Pss struct { *network.Kademlia // we can get the Kademlia address from this privateKey *ecdsa.PrivateKey // pss can have it's own independent key w *whisper.Whisper // key and encryption backend auxAPIs []rpc.API // builtins (handshake, test) can add APIs // sending and forwarding fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer fwdPoolMu sync.RWMutex fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg fwdCacheMu sync.RWMutex cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) msgTTL time.Duration paddingByteSize int capstring string outbox chan *PssMsg // keys and peers pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic. pubKeyPoolMu sync.RWMutex symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic. symKeyPoolMu sync.RWMutex symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array symKeyDecryptCacheCapacity int // max amount of symkeys to keep. // message handling handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex hashPool sync.Pool topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go) // process quitC chan struct{} } func (p *Pss) String() string { return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey))) } // Creates a new Pss instance. // // In addition to params, it takes a swarm network Kademlia // and a FileStore storage for message cache storage. func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { if params.privateKey == nil { return nil, errors.New("missing private key for pss") } cap := p2p.Cap{ Name: pssProtocolName, Version: pssVersion, } ps := &Pss{ Kademlia: k, privateKey: params.privateKey, w: whisper.New(&whisper.DefaultConfig), quitC: make(chan struct{}), fwdPool: make(map[string]*protocols.Peer), fwdCache: make(map[pssDigest]pssCacheEntry), cacheTTL: params.CacheTTL, msgTTL: params.MsgTTL, paddingByteSize: defaultPaddingByteSize, capstring: cap.String(), outbox: make(chan *PssMsg, defaultOutboxCapacity), pubKeyPool: make(map[string]map[Topic]*pssPeer), symKeyPool: make(map[string]map[Topic]*pssPeer), symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity), symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity, handlers: make(map[Topic]map[*handler]bool), topicHandlerCaps: make(map[Topic]*handlerCaps), hashPool: sync.Pool{ New: func() interface{} { return sha3.NewLegacyKeccak256() }, }, } for i := 0; i < hasherCount; i++ { hashfunc := storage.MakeHashFunc(storage.DefaultHash)() ps.hashPool.Put(hashfunc) } return ps, nil } ///////////////////////////////////////////////////////////////////// // SECTION: node.Service interface ///////////////////////////////////////////////////////////////////// func (p *Pss) Start(srv *p2p.Server) error { go func() { ticker := time.NewTicker(defaultCleanInterval) cacheTicker := time.NewTicker(p.cacheTTL) defer ticker.Stop() defer cacheTicker.Stop() for { select { case <-cacheTicker.C: p.cleanFwdCache() case <-ticker.C: p.cleanKeys() case <-p.quitC: return } } }() go func() { for { select { case msg := <-p.outbox: err := p.forward(msg) if err != nil { log.Error(err.Error()) metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) } case <-p.quitC: return } } }() log.Info("Started Pss") log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey()))) return nil } func (p *Pss) Stop() error { log.Info("Pss shutting down") close(p.quitC) return nil } var pssSpec = &protocols.Spec{ Name: pssProtocolName, Version: pssVersion, MaxMsgSize: defaultMaxMsgSize, Messages: []interface{}{ PssMsg{}, }, } func (p *Pss) Protocols() []p2p.Protocol { return []p2p.Protocol{ { Name: pssSpec.Name, Version: pssSpec.Version, Length: pssSpec.Length(), Run: p.Run, }, } } func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { pp := protocols.NewPeer(peer, rw, pssSpec) p.fwdPoolMu.Lock() p.fwdPool[peer.Info().ID] = pp p.fwdPoolMu.Unlock() return pp.Run(p.handlePssMsg) } func (p *Pss) APIs() []rpc.API { apis := []rpc.API{ { Namespace: "pss", Version: "1.0", Service: NewAPI(p), Public: true, }, } apis = append(apis, p.auxAPIs...) return apis } // add API methods to the pss API // must be run before node is started func (p *Pss) addAPI(api rpc.API) { p.auxAPIs = append(p.auxAPIs, api) } // Returns the swarm Kademlia address of the pss node func (p *Pss) BaseAddr() []byte { return p.Kademlia.BaseAddr() } // Returns the pss node's public key func (p *Pss) PublicKey() *ecdsa.PublicKey { return &p.privateKey.PublicKey } ///////////////////////////////////////////////////////////////////// // SECTION: Message handling ///////////////////////////////////////////////////////////////////// // Links a handler function to a Topic // // All incoming messages with an envelope Topic matching the // topic specified will be passed to the given Handler function. // // There may be an arbitrary number of handler functions per topic. // // Returns a deregister function which needs to be called to // deregister the handler, func (p *Pss) Register(topic *Topic, hndlr *handler) func() { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] if handlers == nil { handlers = make(map[*handler]bool) p.handlers[*topic] = handlers log.Debug("registered handler", "caps", hndlr.caps) } if hndlr.caps == nil { hndlr.caps = &handlerCaps{} } handlers[hndlr] = true if _, ok := p.topicHandlerCaps[*topic]; !ok { p.topicHandlerCaps[*topic] = &handlerCaps{} } if hndlr.caps.raw { p.topicHandlerCaps[*topic].raw = true } if hndlr.caps.prox { p.topicHandlerCaps[*topic].prox = true } return func() { p.deregister(topic, hndlr) } } func (p *Pss) deregister(topic *Topic, hndlr *handler) { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] if len(handlers) > 1 { delete(p.handlers, *topic) // topic caps might have changed now that a handler is gone caps := &handlerCaps{} for h := range handlers { if h.caps.raw { caps.raw = true } if h.caps.prox { caps.prox = true } } p.topicHandlerCaps[*topic] = caps return } delete(handlers, hndlr) } // Filters incoming messages for processing or forwarding. // Check if address partially matches // If yes, it CAN be for us, and we process it // Only passes error to pss protocol handler if payload is not valid pssmsg func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1) pssmsg, ok := msg.(*PssMsg) if !ok { return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg) } log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:])) if int64(pssmsg.Expire) < time.Now().Unix() { metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1) log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To)) return nil } if p.checkFwdCache(pssmsg) { log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To))) return nil } p.addFwdCache(pssmsg) psstopic := Topic(pssmsg.Payload.Topic) // raw is simplest handler contingency to check, so check that first var isRaw bool if pssmsg.isRaw() { if _, ok := p.topicHandlerCaps[psstopic]; ok { if !p.topicHandlerCaps[psstopic].raw { log.Debug("No handler for raw message", "topic", psstopic) return nil } } isRaw = true } // check if we can be recipient: // - no prox handler on message and partial address matches // - prox handler on message and we are in prox regardless of partial address match // store this result so we don't calculate again on every handler var isProx bool if _, ok := p.topicHandlerCaps[psstopic]; ok { isProx = p.topicHandlerCaps[psstopic].prox } isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx) if !isRecipient { log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) return p.enqueue(pssmsg) } log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:])) if err := p.process(pssmsg, isRaw, isProx); err != nil { qerr := p.enqueue(pssmsg) if qerr != nil { return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr) } } return nil } // Entry point to processing a message for which the current node can be the intended recipient. // Attempts symmetric and asymmetric decryption with stored keys. // Dispatches message to all handlers matching the message topic func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { metrics.GetOrRegisterCounter("pss.process", nil).Inc(1) var err error var recvmsg *whisper.ReceivedMessage var payload []byte var from PssAddress var asymmetric bool var keyid string var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) if raw { payload = pssmsg.Payload.Data } else { if pssmsg.isSym() { keyFunc = p.processSym } else { asymmetric = true keyFunc = p.processAsym } recvmsg, keyid, from, err = keyFunc(envelope) if err != nil { return errors.New("Decryption failed") } payload = recvmsg.Payload } if len(pssmsg.To) < addressLength { if err := p.enqueue(pssmsg); err != nil { return err } } p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid) return nil } // copy all registered handlers for respective topic in order to avoid data race or deadlock func (p *Pss) getHandlers(topic Topic) (ret []*handler) { p.handlersMu.RLock() defer p.handlersMu.RUnlock() for k := range p.handlers[topic] { ret = append(ret, k) } return ret } func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for _, h := range handlers { if !h.caps.raw && raw { log.Warn("norawhandler") continue } if !h.caps.prox && prox { log.Warn("noproxhandler") continue } err := (h.f)(payload, peer, asymmetric, keyid) if err != nil { log.Warn("Pss handler failed", "err", err) } } } // will return false if using partial address func (p *Pss) isSelfRecipient(msg *PssMsg) bool { return bytes.Equal(msg.To, p.Kademlia.BaseAddr()) } // test match of leftmost bytes in given message to node's Kademlia address func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { local := p.Kademlia.BaseAddr() // if a partial address matches we are possible recipient regardless of prox // if not and prox is not set, we are surely not if bytes.Equal(msg.To, local[:len(msg.To)]) { return true } else if !prox { return false } depth := p.Kademlia.NeighbourhoodDepth() po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0) log.Trace("selfpossible", "po", po, "depth", depth) return depth <= po } ///////////////////////////////////////////////////////////////////// // SECTION: Encryption ///////////////////////////////////////////////////////////////////// // Links a peer ECDSA public key to a topic // // This is required for asymmetric message exchange // on the given topic // // The value in `address` will be used as a routing hint for the // public key / topic association func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error { if err := validateAddress(address); err != nil { return err } pubkeybytes := crypto.FromECDSAPub(pubkey) if len(pubkeybytes) == 0 { return fmt.Errorf("invalid public key: %v", pubkey) } pubkeyid := common.ToHex(pubkeybytes) psp := &pssPeer{ address: address, } p.pubKeyPoolMu.Lock() if _, ok := p.pubKeyPool[pubkeyid]; !ok { p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer) } p.pubKeyPool[pubkeyid][topic] = psp p.pubKeyPoolMu.Unlock() log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address) return nil } // Automatically generate a new symkey for a topic and address hint func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) { keyid, err := p.w.GenerateSymKey() if err != nil { return "", err } p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false) return keyid, nil } // Links a peer symmetric key (arbitrary byte sequence) to a topic // // This is required for symmetrically encrypted message exchange // on the given topic // // The key is stored in the whisper backend. // // If addtocache is set to true, the key will be added to the cache of keys // used to attempt symmetric decryption of incoming messages. // // Returns a string id that can be used to retrieve the key bytes // from the whisper backend (see pss.GetSymmetricKey()) func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) { if err := validateAddress(address); err != nil { return "", err } return p.setSymmetricKey(key, topic, address, addtocache, true) } func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) { keyid, err := p.w.AddSymKeyDirect(key) if err != nil { return "", err } p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected) return keyid, nil } // adds a symmetric key to the pss key pool, and optionally adds the key // to the collection of keys used to attempt symmetric decryption of // incoming messages func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) { psp := &pssPeer{ address: address, protected: protected, } p.symKeyPoolMu.Lock() if _, ok := p.symKeyPool[keyid]; !ok { p.symKeyPool[keyid] = make(map[Topic]*pssPeer) } p.symKeyPool[keyid][topic] = psp p.symKeyPoolMu.Unlock() if addtocache { p.symKeyDecryptCacheCursor++ p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid } key, _ := p.GetSymmetricKey(keyid) log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache) } // Returns a symmetric key byte seqyence stored in the whisper backend // by its unique id // // Passes on the error value from the whisper backend func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) { symkey, err := p.w.GetSymKey(symkeyid) if err != nil { return nil, err } return symkey, nil } // Returns all recorded topic and address combination for a specific public key func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) { p.pubKeyPoolMu.RLock() defer p.pubKeyPoolMu.RUnlock() for t, peer := range p.pubKeyPool[keyid] { topic = append(topic, t) address = append(address, peer.address) } return topic, address, nil } func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) { p.pubKeyPoolMu.RLock() defer p.pubKeyPoolMu.RUnlock() if peers, ok := p.pubKeyPool[keyid]; ok { if t, ok := peers[topic]; ok { return t.address, nil } } return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic) } // Attempt to decrypt, validate and unpack a // symmetrically encrypted message // If successful, returns the unpacked whisper ReceivedMessage struct // encapsulating the decrypted message, and the whisper backend id // of the symmetric key used to decrypt the message. // It fails if decryption of the message fails or if the message is corrupted func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1) for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- { symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)] symkey, err := p.w.GetSymKey(*symkeyid) if err != nil { continue } recvmsg, err := envelope.OpenSymmetric(symkey) if err != nil { continue } if !recvmsg.Validate() { return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt") } p.symKeyPoolMu.Lock() from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address p.symKeyPoolMu.Unlock() p.symKeyDecryptCacheCursor++ p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid return recvmsg, *symkeyid, from, nil } return nil, "", nil, fmt.Errorf("could not decrypt message") } // Attempt to decrypt, validate and unpack an // asymmetrically encrypted message // If successful, returns the unpacked whisper ReceivedMessage struct // encapsulating the decrypted message, and the byte representation of // the public key used to decrypt the message. // It fails if decryption of message fails, or if the message is corrupted func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1) recvmsg, err := envelope.OpenAsymmetric(p.privateKey) if err != nil { return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err) } // check signature (if signed), strip padding if !recvmsg.Validate() { return nil, "", nil, fmt.Errorf("invalid message") } pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src)) var from PssAddress p.pubKeyPoolMu.Lock() if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil { from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address } p.pubKeyPoolMu.Unlock() return recvmsg, pubkeyid, from, nil } // Symkey garbage collection // a key is removed if: // - it is not marked as protected // - it is not in the incoming decryption cache func (p *Pss) cleanKeys() (count int) { for keyid, peertopics := range p.symKeyPool { var expiredtopics []Topic for topic, psp := range peertopics { if psp.protected { continue } var match bool for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- { cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)] if *cacheid == keyid { match = true } } if !match { expiredtopics = append(expiredtopics, topic) } } for _, topic := range expiredtopics { p.symKeyPoolMu.Lock() delete(p.symKeyPool[keyid], topic) log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid]) p.symKeyPoolMu.Unlock() count++ } } return } ///////////////////////////////////////////////////////////////////// // SECTION: Message sending ///////////////////////////////////////////////////////////////////// func (p *Pss) enqueue(msg *PssMsg) error { select { case p.outbox <- msg: return nil default: } metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1) return errors.New("outbox full") } // Send a raw message (any encryption is responsibility of calling client) // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { if err := validateAddress(address); err != nil { return err } pssMsgParams := &msgParams{ raw: true, } payload := &whisper.Envelope{ Data: msg, Topic: whisper.TopicType(topic), } pssMsg := newPssMsg(pssMsgParams) pssMsg.To = address pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = payload p.addFwdCache(pssMsg) err := p.enqueue(pssMsg) if err != nil { return err } // if we have a proxhandler on this topic // also deliver message to ourselves if _, ok := p.topicHandlerCaps[topic]; ok { if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { return p.process(pssMsg, true, true) } } return nil } // Send a message using symmetric encryption // // Fails if the key id does not match any of the stored symmetric keys func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error { symkey, err := p.GetSymmetricKey(symkeyid) if err != nil { return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err) } p.symKeyPoolMu.Lock() psp, ok := p.symKeyPool[symkeyid][topic] p.symKeyPoolMu.Unlock() if !ok { return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid) } return p.send(psp.address, topic, msg, false, symkey) } // Send a message using asymmetric encryption // // Fails if the key id does not match any in of the stored public keys func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error { if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid) } p.pubKeyPoolMu.Lock() psp, ok := p.pubKeyPool[pubkeyid][topic] p.pubKeyPoolMu.Unlock() if !ok { return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid) } return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid)) } // Send is payload agnostic, and will accept any byte slice as payload // It generates an whisper envelope for the specified recipient and topic, // and wraps the message payload in it. // TODO: Implement proper message padding func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error { metrics.GetOrRegisterCounter("pss.send", nil).Inc(1) if key == nil || bytes.Equal(key, []byte{}) { return fmt.Errorf("Zero length key passed to pss send") } padding := make([]byte, p.paddingByteSize) c, err := rand.Read(padding) if err != nil { return err } else if c < p.paddingByteSize { return fmt.Errorf("invalid padding length: %d", c) } wparams := &whisper.MessageParams{ TTL: defaultWhisperTTL, Src: p.privateKey, Topic: whisper.TopicType(topic), WorkTime: defaultWhisperWorkTime, PoW: defaultWhisperPoW, Payload: msg, Padding: padding, } if asymmetric { pk, err := crypto.UnmarshalPubkey(key) if err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", key) } wparams.Dst = pk } else { wparams.KeySym = key } // set up outgoing message container, which does encryption and envelope wrapping woutmsg, err := whisper.NewSentMessage(wparams) if err != nil { return fmt.Errorf("failed to generate whisper message encapsulation: %v", err) } // performs encryption. // Does NOT perform / performs negligible PoW due to very low difficulty setting // after this the message is ready for sending envelope, err := woutmsg.Wrap(wparams) if err != nil { return fmt.Errorf("failed to perform whisper encryption: %v", err) } log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key)) // prepare for devp2p transport pssMsgParams := &msgParams{ sym: !asymmetric, } pssMsg := newPssMsg(pssMsgParams) pssMsg.To = to pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = envelope err = p.enqueue(pssMsg) if err != nil { return err } if _, ok := p.topicHandlerCaps[topic]; ok { if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { return p.process(pssMsg, true, true) } } return nil } // sendFunc is a helper function that tries to send a message and returns true on success. // It is set here for usage in production, and optionally overridden in tests. var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg // tries to send a message, returns true if successful func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps { if capability == p.capstring { isPssEnabled = true break } } if !isPssEnabled { log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) return false } // get the protocol peer from the forwarding peer cache p.fwdPoolMu.RLock() pp := p.fwdPool[sp.Info().ID] p.fwdPoolMu.RUnlock() err := pp.Send(context.TODO(), msg) if err != nil { metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) log.Error(err.Error()) } return err == nil } // Forwards a pss message to the peer(s) based on recipient address according to the algorithm // described below. The recipient address can be of any length, and the byte slice will be matched // to the MSB slice of the peer address of the equivalent length. // // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of // partial address, it should be forwarded to all the peers matching the partial address, if there // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message // forwarding fails, the node should try to forward it to the next best peer, until the message is // successfully forwarded to at least one peer. func (p *Pss) forward(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) sent := 0 // number of successful sends to := make([]byte, addressLength) copy(to[:len(msg.To)], msg.To) neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, // but the luminosity is less. here luminosity equals the number of bits given in the destination address. luminosityRadius := len(msg.To) * 8 // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) pof := pot.DefaultPof(neighbourhoodDepth) // soft threshold for msg broadcast broadcastThreshold, _ := pof(to, p.BaseAddr(), 0) if broadcastThreshold > luminosityRadius { broadcastThreshold = luminosityRadius } var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn // call below), then peers that fall in the same proximity bin as recipient address will appear // [at least] one bit closer, but only if these additional bits are given in the recipient address. if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth { broadcastThreshold++ onlySendOnce = true } p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool { if po < broadcastThreshold && sent > 0 { return false // stop iterating } if sendFunc(p, sp, msg) { sent++ if onlySendOnce { return false } if po == addressLength*8 { // stop iterating if successfully sent to the exact recipient (perfect match of full address) return false } } return true }) // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { log.Debug("unable to forward to any peers") if err := p.enqueue(msg); err != nil { metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1) log.Error(err.Error()) return err } } // cache the message p.addFwdCache(msg) return nil } ///////////////////////////////////////////////////////////////////// // SECTION: Caching ///////////////////////////////////////////////////////////////////// // cleanFwdCache is used to periodically remove expired entries from the forward cache func (p *Pss) cleanFwdCache() { metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() for k, v := range p.fwdCache { if v.expiresAt.Before(time.Now()) { delete(p.fwdCache, k) } } } func label(b []byte) string { return fmt.Sprintf("%04x", b[:2]) } // add a message to the cache func (p *Pss) addFwdCache(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1) var entry pssCacheEntry var ok bool p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() digest := p.digest(msg) if entry, ok = p.fwdCache[digest]; !ok { entry = pssCacheEntry{} } entry.expiresAt = time.Now().Add(p.cacheTTL) p.fwdCache[digest] = entry return nil } // check if message is in the cache func (p *Pss) checkFwdCache(msg *PssMsg) bool { p.fwdCacheMu.Lock() defer p.fwdCacheMu.Unlock() digest := p.digest(msg) entry, ok := p.fwdCache[digest] if ok { if entry.expiresAt.After(time.Now()) { log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest)) metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1) return true } metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1) } return false } // Digest of message func (p *Pss) digest(msg *PssMsg) pssDigest { return p.digestBytes(msg.serialize()) } func (p *Pss) digestBytes(msg []byte) pssDigest { hasher := p.hashPool.Get().(hash.Hash) defer p.hashPool.Put(hasher) hasher.Reset() hasher.Write(msg) digest := pssDigest{} key := hasher.Sum(nil) copy(digest[:], key[:digestLength]) return digest } func validateAddress(addr PssAddress) error { if len(addr) > addressLength { return errors.New("address too long") } return nil }