1098 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1098 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2018 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package pss
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"crypto/ecdsa"
 | |
| 	"crypto/rand"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/common"
 | |
| 	"github.com/ethereum/go-ethereum/crypto"
 | |
| 	"github.com/ethereum/go-ethereum/metrics"
 | |
| 	"github.com/ethereum/go-ethereum/p2p"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/enode"
 | |
| 	"github.com/ethereum/go-ethereum/p2p/protocols"
 | |
| 	"github.com/ethereum/go-ethereum/rpc"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/log"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/network"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/pot"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/storage"
 | |
| 	whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
 | |
| 	"golang.org/x/crypto/sha3"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultPaddingByteSize     = 16
 | |
| 	DefaultMsgTTL              = time.Second * 120
 | |
| 	defaultDigestCacheTTL      = time.Second * 10
 | |
| 	defaultSymKeyCacheCapacity = 512
 | |
| 	digestLength               = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
 | |
| 	defaultWhisperWorkTime     = 3
 | |
| 	defaultWhisperPoW          = 0.0000000001
 | |
| 	defaultMaxMsgSize          = 1024 * 1024
 | |
| 	defaultCleanInterval       = time.Second * 60 * 10
 | |
| 	defaultOutboxCapacity      = 100000
 | |
| 	pssProtocolName            = "pss"
 | |
| 	pssVersion                 = 2
 | |
| 	hasherCount                = 8
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	addressLength = len(pot.Address{})
 | |
| )
 | |
| 
 | |
| // cache is used for preventing backwards routing
 | |
| // will also be instrumental in flood guard mechanism
 | |
| // and mailbox implementation
 | |
| type pssCacheEntry struct {
 | |
| 	expiresAt time.Time
 | |
| }
 | |
| 
 | |
| // abstraction to enable access to p2p.protocols.Peer.Send
 | |
| type senderPeer interface {
 | |
| 	Info() *p2p.PeerInfo
 | |
| 	ID() enode.ID
 | |
| 	Address() []byte
 | |
| 	Send(context.Context, interface{}) error
 | |
| }
 | |
| 
 | |
| // per-key peer related information
 | |
| // member `protected` prevents garbage collection of the instance
 | |
| type pssPeer struct {
 | |
| 	lastSeen  time.Time
 | |
| 	address   PssAddress
 | |
| 	protected bool
 | |
| }
 | |
| 
 | |
| // Pss configuration parameters
 | |
| type PssParams struct {
 | |
| 	MsgTTL              time.Duration
 | |
| 	CacheTTL            time.Duration
 | |
| 	privateKey          *ecdsa.PrivateKey
 | |
| 	SymKeyCacheCapacity int
 | |
| 	AllowRaw            bool // If true, enables sending and receiving messages without builtin pss encryption
 | |
| }
 | |
| 
 | |
| // Sane defaults for Pss
 | |
| func NewPssParams() *PssParams {
 | |
| 	return &PssParams{
 | |
| 		MsgTTL:              DefaultMsgTTL,
 | |
| 		CacheTTL:            defaultDigestCacheTTL,
 | |
| 		SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
 | |
| 	params.privateKey = privatekey
 | |
| 	return params
 | |
| }
 | |
| 
 | |
| // Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
 | |
| //
 | |
| // Implements node.Service
 | |
| type Pss struct {
 | |
| 	*network.Kademlia                   // we can get the Kademlia address from this
 | |
| 	privateKey        *ecdsa.PrivateKey // pss can have it's own independent key
 | |
| 	w                 *whisper.Whisper  // key and encryption backend
 | |
| 	auxAPIs           []rpc.API         // builtins (handshake, test) can add APIs
 | |
| 
 | |
| 	// sending and forwarding
 | |
| 	fwdPool         map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
 | |
| 	fwdPoolMu       sync.RWMutex
 | |
| 	fwdCache        map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
 | |
| 	fwdCacheMu      sync.RWMutex
 | |
| 	cacheTTL        time.Duration // how long to keep messages in fwdCache (not implemented)
 | |
| 	msgTTL          time.Duration
 | |
| 	paddingByteSize int
 | |
| 	capstring       string
 | |
| 	outbox          chan *PssMsg
 | |
| 
 | |
| 	// keys and peers
 | |
| 	pubKeyPool                 map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
 | |
| 	pubKeyPoolMu               sync.RWMutex
 | |
| 	symKeyPool                 map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
 | |
| 	symKeyPoolMu               sync.RWMutex
 | |
| 	symKeyDecryptCache         []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
 | |
| 	symKeyDecryptCacheCursor   int       // modular cursor pointing to last used, wraps on symKeyDecryptCache array
 | |
| 	symKeyDecryptCacheCapacity int       // max amount of symkeys to keep.
 | |
| 
 | |
| 	// message handling
 | |
| 	handlers           map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
 | |
| 	handlersMu         sync.RWMutex
 | |
| 	hashPool           sync.Pool
 | |
| 	topicHandlerCaps   map[Topic]*handlerCaps // caches capabilities of each topic's handlers
 | |
| 	topicHandlerCapsMu sync.RWMutex
 | |
| 
 | |
| 	// process
 | |
| 	quitC chan struct{}
 | |
| }
 | |
| 
 | |
| func (p *Pss) String() string {
 | |
| 	return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
 | |
| }
 | |
| 
 | |
| // Creates a new Pss instance.
 | |
| //
 | |
| // In addition to params, it takes a swarm network Kademlia
 | |
| // and a FileStore storage for message cache storage.
 | |
| func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
 | |
| 	if params.privateKey == nil {
 | |
| 		return nil, errors.New("missing private key for pss")
 | |
| 	}
 | |
| 	cap := p2p.Cap{
 | |
| 		Name:    pssProtocolName,
 | |
| 		Version: pssVersion,
 | |
| 	}
 | |
| 	ps := &Pss{
 | |
| 		Kademlia:   k,
 | |
| 		privateKey: params.privateKey,
 | |
| 		w:          whisper.New(&whisper.DefaultConfig),
 | |
| 		quitC:      make(chan struct{}),
 | |
| 
 | |
| 		fwdPool:         make(map[string]*protocols.Peer),
 | |
| 		fwdCache:        make(map[pssDigest]pssCacheEntry),
 | |
| 		cacheTTL:        params.CacheTTL,
 | |
| 		msgTTL:          params.MsgTTL,
 | |
| 		paddingByteSize: defaultPaddingByteSize,
 | |
| 		capstring:       cap.String(),
 | |
| 		outbox:          make(chan *PssMsg, defaultOutboxCapacity),
 | |
| 
 | |
| 		pubKeyPool:                 make(map[string]map[Topic]*pssPeer),
 | |
| 		symKeyPool:                 make(map[string]map[Topic]*pssPeer),
 | |
| 		symKeyDecryptCache:         make([]*string, params.SymKeyCacheCapacity),
 | |
| 		symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
 | |
| 
 | |
| 		handlers:         make(map[Topic]map[*handler]bool),
 | |
| 		topicHandlerCaps: make(map[Topic]*handlerCaps),
 | |
| 
 | |
| 		hashPool: sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return sha3.NewLegacyKeccak256()
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < hasherCount; i++ {
 | |
| 		hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
 | |
| 		ps.hashPool.Put(hashfunc)
 | |
| 	}
 | |
| 
 | |
| 	return ps, nil
 | |
| }
 | |
| 
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| // SECTION: node.Service interface
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| func (p *Pss) Start(srv *p2p.Server) error {
 | |
| 	go func() {
 | |
| 		ticker := time.NewTicker(defaultCleanInterval)
 | |
| 		cacheTicker := time.NewTicker(p.cacheTTL)
 | |
| 		defer ticker.Stop()
 | |
| 		defer cacheTicker.Stop()
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-cacheTicker.C:
 | |
| 				p.cleanFwdCache()
 | |
| 			case <-ticker.C:
 | |
| 				p.cleanKeys()
 | |
| 			case <-p.quitC:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			case msg := <-p.outbox:
 | |
| 				err := p.forward(msg)
 | |
| 				if err != nil {
 | |
| 					log.Error(err.Error())
 | |
| 					metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
 | |
| 				}
 | |
| 			case <-p.quitC:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	log.Info("Started Pss")
 | |
| 	log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p *Pss) Stop() error {
 | |
| 	log.Info("Pss shutting down")
 | |
| 	close(p.quitC)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| var pssSpec = &protocols.Spec{
 | |
| 	Name:       pssProtocolName,
 | |
| 	Version:    pssVersion,
 | |
| 	MaxMsgSize: defaultMaxMsgSize,
 | |
| 	Messages: []interface{}{
 | |
| 		PssMsg{},
 | |
| 	},
 | |
| }
 | |
| 
 | |
| func (p *Pss) Protocols() []p2p.Protocol {
 | |
| 	return []p2p.Protocol{
 | |
| 		{
 | |
| 			Name:    pssSpec.Name,
 | |
| 			Version: pssSpec.Version,
 | |
| 			Length:  pssSpec.Length(),
 | |
| 			Run:     p.Run,
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
 | |
| 	pp := protocols.NewPeer(peer, rw, pssSpec)
 | |
| 	p.fwdPoolMu.Lock()
 | |
| 	p.fwdPool[peer.Info().ID] = pp
 | |
| 	p.fwdPoolMu.Unlock()
 | |
| 	return pp.Run(p.handlePssMsg)
 | |
| }
 | |
| 
 | |
| func (p *Pss) APIs() []rpc.API {
 | |
| 	apis := []rpc.API{
 | |
| 		{
 | |
| 			Namespace: "pss",
 | |
| 			Version:   "1.0",
 | |
| 			Service:   NewAPI(p),
 | |
| 			Public:    true,
 | |
| 		},
 | |
| 	}
 | |
| 	apis = append(apis, p.auxAPIs...)
 | |
| 	return apis
 | |
| }
 | |
| 
 | |
| // add API methods to the pss API
 | |
| // must be run before node is started
 | |
| func (p *Pss) addAPI(api rpc.API) {
 | |
| 	p.auxAPIs = append(p.auxAPIs, api)
 | |
| }
 | |
| 
 | |
| // Returns the swarm Kademlia address of the pss node
 | |
| func (p *Pss) BaseAddr() []byte {
 | |
| 	return p.Kademlia.BaseAddr()
 | |
| }
 | |
| 
 | |
| // Returns the pss node's public key
 | |
| func (p *Pss) PublicKey() *ecdsa.PublicKey {
 | |
| 	return &p.privateKey.PublicKey
 | |
| }
 | |
| 
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| // SECTION: Message handling
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| func (p *Pss) getTopicHandlerCaps(topic Topic) (hc *handlerCaps, found bool) {
 | |
| 	p.topicHandlerCapsMu.RLock()
 | |
| 	defer p.topicHandlerCapsMu.RUnlock()
 | |
| 	hc, found = p.topicHandlerCaps[topic]
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (p *Pss) setTopicHandlerCaps(topic Topic, hc *handlerCaps) {
 | |
| 	p.topicHandlerCapsMu.Lock()
 | |
| 	defer p.topicHandlerCapsMu.Unlock()
 | |
| 	p.topicHandlerCaps[topic] = hc
 | |
| }
 | |
| 
 | |
| // Links a handler function to a Topic
 | |
| //
 | |
| // All incoming messages with an envelope Topic matching the
 | |
| // topic specified will be passed to the given Handler function.
 | |
| //
 | |
| // There may be an arbitrary number of handler functions per topic.
 | |
| //
 | |
| // Returns a deregister function which needs to be called to
 | |
| // deregister the handler,
 | |
| func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
 | |
| 	p.handlersMu.Lock()
 | |
| 	defer p.handlersMu.Unlock()
 | |
| 	handlers := p.handlers[*topic]
 | |
| 	if handlers == nil {
 | |
| 		handlers = make(map[*handler]bool)
 | |
| 		p.handlers[*topic] = handlers
 | |
| 		log.Debug("registered handler", "capabilities", hndlr.caps)
 | |
| 	}
 | |
| 	if hndlr.caps == nil {
 | |
| 		hndlr.caps = &handlerCaps{}
 | |
| 	}
 | |
| 	handlers[hndlr] = true
 | |
| 
 | |
| 	capabilities, ok := p.getTopicHandlerCaps(*topic)
 | |
| 	if !ok {
 | |
| 		capabilities = &handlerCaps{}
 | |
| 		p.setTopicHandlerCaps(*topic, capabilities)
 | |
| 	}
 | |
| 
 | |
| 	if hndlr.caps.raw {
 | |
| 		capabilities.raw = true
 | |
| 	}
 | |
| 	if hndlr.caps.prox {
 | |
| 		capabilities.prox = true
 | |
| 	}
 | |
| 	return func() { p.deregister(topic, hndlr) }
 | |
| }
 | |
| 
 | |
| func (p *Pss) deregister(topic *Topic, hndlr *handler) {
 | |
| 	p.handlersMu.Lock()
 | |
| 	defer p.handlersMu.Unlock()
 | |
| 	handlers := p.handlers[*topic]
 | |
| 	if len(handlers) > 1 {
 | |
| 		delete(p.handlers, *topic)
 | |
| 		// topic caps might have changed now that a handler is gone
 | |
| 		caps := &handlerCaps{}
 | |
| 		for h := range handlers {
 | |
| 			if h.caps.raw {
 | |
| 				caps.raw = true
 | |
| 			}
 | |
| 			if h.caps.prox {
 | |
| 				caps.prox = true
 | |
| 			}
 | |
| 		}
 | |
| 		p.setTopicHandlerCaps(*topic, caps)
 | |
| 		return
 | |
| 	}
 | |
| 	delete(handlers, hndlr)
 | |
| }
 | |
| 
 | |
| // Filters incoming messages for processing or forwarding.
 | |
| // Check if address partially matches
 | |
| // If yes, it CAN be for us, and we process it
 | |
| // Only passes error to pss protocol handler if payload is not valid pssmsg
 | |
| func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
 | |
| 	metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
 | |
| 	pssmsg, ok := msg.(*PssMsg)
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
 | |
| 	}
 | |
| 	log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
 | |
| 	if int64(pssmsg.Expire) < time.Now().Unix() {
 | |
| 		metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
 | |
| 		log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
 | |
| 		return nil
 | |
| 	}
 | |
| 	if p.checkFwdCache(pssmsg) {
 | |
| 		log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
 | |
| 		return nil
 | |
| 	}
 | |
| 	p.addFwdCache(pssmsg)
 | |
| 
 | |
| 	psstopic := Topic(pssmsg.Payload.Topic)
 | |
| 
 | |
| 	// raw is simplest handler contingency to check, so check that first
 | |
| 	var isRaw bool
 | |
| 	if pssmsg.isRaw() {
 | |
| 		if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok {
 | |
| 			if !capabilities.raw {
 | |
| 				log.Debug("No handler for raw message", "topic", psstopic)
 | |
| 				return nil
 | |
| 			}
 | |
| 		}
 | |
| 		isRaw = true
 | |
| 	}
 | |
| 
 | |
| 	// check if we can be recipient:
 | |
| 	// - no prox handler on message and partial address matches
 | |
| 	// - prox handler on message and we are in prox regardless of partial address match
 | |
| 	// store this result so we don't calculate again on every handler
 | |
| 	var isProx bool
 | |
| 	if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok {
 | |
| 		isProx = capabilities.prox
 | |
| 	}
 | |
| 	isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
 | |
| 	if !isRecipient {
 | |
| 		log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
 | |
| 		return p.enqueue(pssmsg)
 | |
| 	}
 | |
| 
 | |
| 	log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
 | |
| 	if err := p.process(pssmsg, isRaw, isProx); err != nil {
 | |
| 		qerr := p.enqueue(pssmsg)
 | |
| 		if qerr != nil {
 | |
| 			return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Entry point to processing a message for which the current node can be the intended recipient.
 | |
| // Attempts symmetric and asymmetric decryption with stored keys.
 | |
| // Dispatches message to all handlers matching the message topic
 | |
| func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
 | |
| 	metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
 | |
| 
 | |
| 	var err error
 | |
| 	var recvmsg *whisper.ReceivedMessage
 | |
| 	var payload []byte
 | |
| 	var from PssAddress
 | |
| 	var asymmetric bool
 | |
| 	var keyid string
 | |
| 	var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
 | |
| 
 | |
| 	envelope := pssmsg.Payload
 | |
| 	psstopic := Topic(envelope.Topic)
 | |
| 
 | |
| 	if raw {
 | |
| 		payload = pssmsg.Payload.Data
 | |
| 	} else {
 | |
| 		if pssmsg.isSym() {
 | |
| 			keyFunc = p.processSym
 | |
| 		} else {
 | |
| 			asymmetric = true
 | |
| 			keyFunc = p.processAsym
 | |
| 		}
 | |
| 
 | |
| 		recvmsg, keyid, from, err = keyFunc(envelope)
 | |
| 		if err != nil {
 | |
| 			return errors.New("Decryption failed")
 | |
| 		}
 | |
| 		payload = recvmsg.Payload
 | |
| 	}
 | |
| 
 | |
| 	if len(pssmsg.To) < addressLength {
 | |
| 		if err := p.enqueue(pssmsg); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // copy all registered handlers for respective topic in order to avoid data race or deadlock
 | |
| func (p *Pss) getHandlers(topic Topic) (ret []*handler) {
 | |
| 	p.handlersMu.RLock()
 | |
| 	defer p.handlersMu.RUnlock()
 | |
| 	for k := range p.handlers[topic] {
 | |
| 		ret = append(ret, k)
 | |
| 	}
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
 | |
| 	handlers := p.getHandlers(topic)
 | |
| 	peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
 | |
| 	for _, h := range handlers {
 | |
| 		if !h.caps.raw && raw {
 | |
| 			log.Warn("norawhandler")
 | |
| 			continue
 | |
| 		}
 | |
| 		if !h.caps.prox && prox {
 | |
| 			log.Warn("noproxhandler")
 | |
| 			continue
 | |
| 		}
 | |
| 		err := (h.f)(payload, peer, asymmetric, keyid)
 | |
| 		if err != nil {
 | |
| 			log.Warn("Pss handler failed", "err", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // will return false if using partial address
 | |
| func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
 | |
| 	return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
 | |
| }
 | |
| 
 | |
| // test match of leftmost bytes in given message to node's Kademlia address
 | |
| func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
 | |
| 	local := p.Kademlia.BaseAddr()
 | |
| 
 | |
| 	// if a partial address matches we are possible recipient regardless of prox
 | |
| 	// if not and prox is not set, we are surely not
 | |
| 	if bytes.Equal(msg.To, local[:len(msg.To)]) {
 | |
| 
 | |
| 		return true
 | |
| 	} else if !prox {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	depth := p.Kademlia.NeighbourhoodDepth()
 | |
| 	po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
 | |
| 	log.Trace("selfpossible", "po", po, "depth", depth)
 | |
| 
 | |
| 	return depth <= po
 | |
| }
 | |
| 
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| // SECTION: Encryption
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| // Links a peer ECDSA public key to a topic
 | |
| //
 | |
| // This is required for asymmetric message exchange
 | |
| // on the given topic
 | |
| //
 | |
| // The value in `address` will be used as a routing hint for the
 | |
| // public key / topic association
 | |
| func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
 | |
| 	if err := validateAddress(address); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	pubkeybytes := crypto.FromECDSAPub(pubkey)
 | |
| 	if len(pubkeybytes) == 0 {
 | |
| 		return fmt.Errorf("invalid public key: %v", pubkey)
 | |
| 	}
 | |
| 	pubkeyid := common.ToHex(pubkeybytes)
 | |
| 	psp := &pssPeer{
 | |
| 		address: address,
 | |
| 	}
 | |
| 	p.pubKeyPoolMu.Lock()
 | |
| 	if _, ok := p.pubKeyPool[pubkeyid]; !ok {
 | |
| 		p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
 | |
| 	}
 | |
| 	p.pubKeyPool[pubkeyid][topic] = psp
 | |
| 	p.pubKeyPoolMu.Unlock()
 | |
| 	log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Automatically generate a new symkey for a topic and address hint
 | |
| func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
 | |
| 	keyid, err := p.w.GenerateSymKey()
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
 | |
| 	return keyid, nil
 | |
| }
 | |
| 
 | |
| // Links a peer symmetric key (arbitrary byte sequence) to a topic
 | |
| //
 | |
| // This is required for symmetrically encrypted message exchange
 | |
| // on the given topic
 | |
| //
 | |
| // The key is stored in the whisper backend.
 | |
| //
 | |
| // If addtocache is set to true, the key will be added to the cache of keys
 | |
| // used to attempt symmetric decryption of incoming messages.
 | |
| //
 | |
| // Returns a string id that can be used to retrieve the key bytes
 | |
| // from the whisper backend (see pss.GetSymmetricKey())
 | |
| func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
 | |
| 	if err := validateAddress(address); err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	return p.setSymmetricKey(key, topic, address, addtocache, true)
 | |
| }
 | |
| 
 | |
| func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
 | |
| 	keyid, err := p.w.AddSymKeyDirect(key)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
 | |
| 	return keyid, nil
 | |
| }
 | |
| 
 | |
| // adds a symmetric key to the pss key pool, and optionally adds the key
 | |
| // to the collection of keys used to attempt symmetric decryption of
 | |
| // incoming messages
 | |
| func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
 | |
| 	psp := &pssPeer{
 | |
| 		address:   address,
 | |
| 		protected: protected,
 | |
| 	}
 | |
| 	p.symKeyPoolMu.Lock()
 | |
| 	if _, ok := p.symKeyPool[keyid]; !ok {
 | |
| 		p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
 | |
| 	}
 | |
| 	p.symKeyPool[keyid][topic] = psp
 | |
| 	p.symKeyPoolMu.Unlock()
 | |
| 	if addtocache {
 | |
| 		p.symKeyDecryptCacheCursor++
 | |
| 		p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
 | |
| 	}
 | |
| 	key, _ := p.GetSymmetricKey(keyid)
 | |
| 	log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
 | |
| }
 | |
| 
 | |
| // Returns a symmetric key byte seqyence stored in the whisper backend
 | |
| // by its unique id
 | |
| //
 | |
| // Passes on the error value from the whisper backend
 | |
| func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
 | |
| 	symkey, err := p.w.GetSymKey(symkeyid)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return symkey, nil
 | |
| }
 | |
| 
 | |
| // Returns all recorded topic and address combination for a specific public key
 | |
| func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
 | |
| 	p.pubKeyPoolMu.RLock()
 | |
| 	defer p.pubKeyPoolMu.RUnlock()
 | |
| 	for t, peer := range p.pubKeyPool[keyid] {
 | |
| 		topic = append(topic, t)
 | |
| 		address = append(address, peer.address)
 | |
| 	}
 | |
| 
 | |
| 	return topic, address, nil
 | |
| }
 | |
| 
 | |
| func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
 | |
| 	p.pubKeyPoolMu.RLock()
 | |
| 	defer p.pubKeyPoolMu.RUnlock()
 | |
| 	if peers, ok := p.pubKeyPool[keyid]; ok {
 | |
| 		if t, ok := peers[topic]; ok {
 | |
| 			return t.address, nil
 | |
| 		}
 | |
| 	}
 | |
| 	return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
 | |
| }
 | |
| 
 | |
| // Attempt to decrypt, validate and unpack a
 | |
| // symmetrically encrypted message
 | |
| // If successful, returns the unpacked whisper ReceivedMessage struct
 | |
| // encapsulating the decrypted message, and the whisper backend id
 | |
| // of the symmetric key used to decrypt the message.
 | |
| // It fails if decryption of the message fails or if the message is corrupted
 | |
| func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
 | |
| 	metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
 | |
| 
 | |
| 	for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
 | |
| 		symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
 | |
| 		symkey, err := p.w.GetSymKey(*symkeyid)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		recvmsg, err := envelope.OpenSymmetric(symkey)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if !recvmsg.ValidateAndParse() {
 | |
| 			return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
 | |
| 		}
 | |
| 		p.symKeyPoolMu.Lock()
 | |
| 		from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
 | |
| 		p.symKeyPoolMu.Unlock()
 | |
| 		p.symKeyDecryptCacheCursor++
 | |
| 		p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
 | |
| 		return recvmsg, *symkeyid, from, nil
 | |
| 	}
 | |
| 	return nil, "", nil, fmt.Errorf("could not decrypt message")
 | |
| }
 | |
| 
 | |
| // Attempt to decrypt, validate and unpack an
 | |
| // asymmetrically encrypted message
 | |
| // If successful, returns the unpacked whisper ReceivedMessage struct
 | |
| // encapsulating the decrypted message, and the byte representation of
 | |
| // the public key used to decrypt the message.
 | |
| // It fails if decryption of message fails, or if the message is corrupted
 | |
| func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
 | |
| 	metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
 | |
| 
 | |
| 	recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
 | |
| 	if err != nil {
 | |
| 		return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
 | |
| 	}
 | |
| 	// check signature (if signed), strip padding
 | |
| 	if !recvmsg.ValidateAndParse() {
 | |
| 		return nil, "", nil, fmt.Errorf("invalid message")
 | |
| 	}
 | |
| 	pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
 | |
| 	var from PssAddress
 | |
| 	p.pubKeyPoolMu.Lock()
 | |
| 	if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
 | |
| 		from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
 | |
| 	}
 | |
| 	p.pubKeyPoolMu.Unlock()
 | |
| 	return recvmsg, pubkeyid, from, nil
 | |
| }
 | |
| 
 | |
| // Symkey garbage collection
 | |
| // a key is removed if:
 | |
| // - it is not marked as protected
 | |
| // - it is not in the incoming decryption cache
 | |
| func (p *Pss) cleanKeys() (count int) {
 | |
| 	for keyid, peertopics := range p.symKeyPool {
 | |
| 		var expiredtopics []Topic
 | |
| 		for topic, psp := range peertopics {
 | |
| 			if psp.protected {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			var match bool
 | |
| 			for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
 | |
| 				cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
 | |
| 				if *cacheid == keyid {
 | |
| 					match = true
 | |
| 				}
 | |
| 			}
 | |
| 			if !match {
 | |
| 				expiredtopics = append(expiredtopics, topic)
 | |
| 			}
 | |
| 		}
 | |
| 		for _, topic := range expiredtopics {
 | |
| 			p.symKeyPoolMu.Lock()
 | |
| 			delete(p.symKeyPool[keyid], topic)
 | |
| 			log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
 | |
| 			p.symKeyPoolMu.Unlock()
 | |
| 			count++
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| // SECTION: Message sending
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| func (p *Pss) enqueue(msg *PssMsg) error {
 | |
| 	select {
 | |
| 	case p.outbox <- msg:
 | |
| 		return nil
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
 | |
| 	return errors.New("outbox full")
 | |
| }
 | |
| 
 | |
| // Send a raw message (any encryption is responsibility of calling client)
 | |
| //
 | |
| // Will fail if raw messages are disallowed
 | |
| func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
 | |
| 	if err := validateAddress(address); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	pssMsgParams := &msgParams{
 | |
| 		raw: true,
 | |
| 	}
 | |
| 	payload := &whisper.Envelope{
 | |
| 		Data:  msg,
 | |
| 		Topic: whisper.TopicType(topic),
 | |
| 	}
 | |
| 	pssMsg := newPssMsg(pssMsgParams)
 | |
| 	pssMsg.To = address
 | |
| 	pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
 | |
| 	pssMsg.Payload = payload
 | |
| 	p.addFwdCache(pssMsg)
 | |
| 	err := p.enqueue(pssMsg)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// if we have a proxhandler on this topic
 | |
| 	// also deliver message to ourselves
 | |
| 	if capabilities, ok := p.getTopicHandlerCaps(topic); ok {
 | |
| 		if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox {
 | |
| 			return p.process(pssMsg, true, true)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Send a message using symmetric encryption
 | |
| //
 | |
| // Fails if the key id does not match any of the stored symmetric keys
 | |
| func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
 | |
| 	symkey, err := p.GetSymmetricKey(symkeyid)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
 | |
| 	}
 | |
| 	p.symKeyPoolMu.Lock()
 | |
| 	psp, ok := p.symKeyPool[symkeyid][topic]
 | |
| 	p.symKeyPoolMu.Unlock()
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
 | |
| 	}
 | |
| 	return p.send(psp.address, topic, msg, false, symkey)
 | |
| }
 | |
| 
 | |
| // Send a message using asymmetric encryption
 | |
| //
 | |
| // Fails if the key id does not match any in of the stored public keys
 | |
| func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
 | |
| 	if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
 | |
| 		return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
 | |
| 	}
 | |
| 	p.pubKeyPoolMu.Lock()
 | |
| 	psp, ok := p.pubKeyPool[pubkeyid][topic]
 | |
| 	p.pubKeyPoolMu.Unlock()
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
 | |
| 	}
 | |
| 	return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
 | |
| }
 | |
| 
 | |
| // Send is payload agnostic, and will accept any byte slice as payload
 | |
| // It generates an whisper envelope for the specified recipient and topic,
 | |
| // and wraps the message payload in it.
 | |
| // TODO: Implement proper message padding
 | |
| func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
 | |
| 	metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
 | |
| 
 | |
| 	if key == nil || bytes.Equal(key, []byte{}) {
 | |
| 		return fmt.Errorf("Zero length key passed to pss send")
 | |
| 	}
 | |
| 	padding := make([]byte, p.paddingByteSize)
 | |
| 	c, err := rand.Read(padding)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	} else if c < p.paddingByteSize {
 | |
| 		return fmt.Errorf("invalid padding length: %d", c)
 | |
| 	}
 | |
| 	wparams := &whisper.MessageParams{
 | |
| 		TTL:      defaultWhisperTTL,
 | |
| 		Src:      p.privateKey,
 | |
| 		Topic:    whisper.TopicType(topic),
 | |
| 		WorkTime: defaultWhisperWorkTime,
 | |
| 		PoW:      defaultWhisperPoW,
 | |
| 		Payload:  msg,
 | |
| 		Padding:  padding,
 | |
| 	}
 | |
| 	if asymmetric {
 | |
| 		pk, err := crypto.UnmarshalPubkey(key)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
 | |
| 		}
 | |
| 		wparams.Dst = pk
 | |
| 	} else {
 | |
| 		wparams.KeySym = key
 | |
| 	}
 | |
| 	// set up outgoing message container, which does encryption and envelope wrapping
 | |
| 	woutmsg, err := whisper.NewSentMessage(wparams)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
 | |
| 	}
 | |
| 	// performs encryption.
 | |
| 	// Does NOT perform / performs negligible PoW due to very low difficulty setting
 | |
| 	// after this the message is ready for sending
 | |
| 	envelope, err := woutmsg.Wrap(wparams)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to perform whisper encryption: %v", err)
 | |
| 	}
 | |
| 	log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
 | |
| 
 | |
| 	// prepare for devp2p transport
 | |
| 	pssMsgParams := &msgParams{
 | |
| 		sym: !asymmetric,
 | |
| 	}
 | |
| 	pssMsg := newPssMsg(pssMsgParams)
 | |
| 	pssMsg.To = to
 | |
| 	pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
 | |
| 	pssMsg.Payload = envelope
 | |
| 	err = p.enqueue(pssMsg)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if capabilities, ok := p.getTopicHandlerCaps(topic); ok {
 | |
| 		if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox {
 | |
| 			return p.process(pssMsg, true, true)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // sendFunc is a helper function that tries to send a message and returns true on success.
 | |
| // It is set here for usage in production, and optionally overridden in tests.
 | |
| var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg
 | |
| 
 | |
| // tries to send a message, returns true if successful
 | |
| func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
 | |
| 	var isPssEnabled bool
 | |
| 	info := sp.Info()
 | |
| 	for _, capability := range info.Caps {
 | |
| 		if capability == p.capstring {
 | |
| 			isPssEnabled = true
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	if !isPssEnabled {
 | |
| 		log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// get the protocol peer from the forwarding peer cache
 | |
| 	p.fwdPoolMu.RLock()
 | |
| 	pp := p.fwdPool[sp.Info().ID]
 | |
| 	p.fwdPoolMu.RUnlock()
 | |
| 
 | |
| 	err := pp.Send(context.TODO(), msg)
 | |
| 	if err != nil {
 | |
| 		metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
 | |
| 		log.Error(err.Error())
 | |
| 	}
 | |
| 
 | |
| 	return err == nil
 | |
| }
 | |
| 
 | |
| // Forwards a pss message to the peer(s) based on recipient address according to the algorithm
 | |
| // described below. The recipient address can be of any length, and the byte slice will be matched
 | |
| // to the MSB slice of the peer address of the equivalent length.
 | |
| //
 | |
| // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
 | |
| // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
 | |
| // partial address, it should be forwarded to all the peers matching the partial address, if there
 | |
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
 | |
| // forwarding fails, the node should try to forward it to the next best peer, until the message is
 | |
| // successfully forwarded to at least one peer.
 | |
| func (p *Pss) forward(msg *PssMsg) error {
 | |
| 	metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
 | |
| 	sent := 0 // number of successful sends
 | |
| 	to := make([]byte, addressLength)
 | |
| 	copy(to[:len(msg.To)], msg.To)
 | |
| 	neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
 | |
| 
 | |
| 	// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
 | |
| 	// but the luminosity is less. here luminosity equals the number of bits given in the destination address.
 | |
| 	luminosityRadius := len(msg.To) * 8
 | |
| 
 | |
| 	// proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
 | |
| 	pof := pot.DefaultPof(neighbourhoodDepth)
 | |
| 
 | |
| 	// soft threshold for msg broadcast
 | |
| 	broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
 | |
| 	if broadcastThreshold > luminosityRadius {
 | |
| 		broadcastThreshold = luminosityRadius
 | |
| 	}
 | |
| 
 | |
| 	var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
 | |
| 
 | |
| 	// if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
 | |
| 	// call below), then peers that fall in the same proximity bin as recipient address will appear
 | |
| 	// [at least] one bit closer, but only if these additional bits are given in the recipient address.
 | |
| 	if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
 | |
| 		broadcastThreshold++
 | |
| 		onlySendOnce = true
 | |
| 	}
 | |
| 
 | |
| 	p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
 | |
| 		if po < broadcastThreshold && sent > 0 {
 | |
| 			return false // stop iterating
 | |
| 		}
 | |
| 		if sendFunc(p, sp, msg) {
 | |
| 			sent++
 | |
| 			if onlySendOnce {
 | |
| 				return false
 | |
| 			}
 | |
| 			if po == addressLength*8 {
 | |
| 				// stop iterating if successfully sent to the exact recipient (perfect match of full address)
 | |
| 				return false
 | |
| 			}
 | |
| 		}
 | |
| 		return true
 | |
| 	})
 | |
| 
 | |
| 	// if we failed to send to anyone, re-insert message in the send-queue
 | |
| 	if sent == 0 {
 | |
| 		log.Debug("unable to forward to any peers")
 | |
| 		if err := p.enqueue(msg); err != nil {
 | |
| 			metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
 | |
| 			log.Error(err.Error())
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// cache the message
 | |
| 	p.addFwdCache(msg)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| // SECTION: Caching
 | |
| /////////////////////////////////////////////////////////////////////
 | |
| 
 | |
| // cleanFwdCache is used to periodically remove expired entries from the forward cache
 | |
| func (p *Pss) cleanFwdCache() {
 | |
| 	metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
 | |
| 	p.fwdCacheMu.Lock()
 | |
| 	defer p.fwdCacheMu.Unlock()
 | |
| 	for k, v := range p.fwdCache {
 | |
| 		if v.expiresAt.Before(time.Now()) {
 | |
| 			delete(p.fwdCache, k)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func label(b []byte) string {
 | |
| 	return fmt.Sprintf("%04x", b[:2])
 | |
| }
 | |
| 
 | |
| // add a message to the cache
 | |
| func (p *Pss) addFwdCache(msg *PssMsg) error {
 | |
| 	metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
 | |
| 
 | |
| 	var entry pssCacheEntry
 | |
| 	var ok bool
 | |
| 
 | |
| 	p.fwdCacheMu.Lock()
 | |
| 	defer p.fwdCacheMu.Unlock()
 | |
| 
 | |
| 	digest := p.digest(msg)
 | |
| 	if entry, ok = p.fwdCache[digest]; !ok {
 | |
| 		entry = pssCacheEntry{}
 | |
| 	}
 | |
| 	entry.expiresAt = time.Now().Add(p.cacheTTL)
 | |
| 	p.fwdCache[digest] = entry
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // check if message is in the cache
 | |
| func (p *Pss) checkFwdCache(msg *PssMsg) bool {
 | |
| 	p.fwdCacheMu.Lock()
 | |
| 	defer p.fwdCacheMu.Unlock()
 | |
| 
 | |
| 	digest := p.digest(msg)
 | |
| 	entry, ok := p.fwdCache[digest]
 | |
| 	if ok {
 | |
| 		if entry.expiresAt.After(time.Now()) {
 | |
| 			log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
 | |
| 			metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
 | |
| 			return true
 | |
| 		}
 | |
| 		metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // Digest of message
 | |
| func (p *Pss) digest(msg *PssMsg) pssDigest {
 | |
| 	return p.digestBytes(msg.serialize())
 | |
| }
 | |
| 
 | |
| func (p *Pss) digestBytes(msg []byte) pssDigest {
 | |
| 	hasher := p.hashPool.Get().(hash.Hash)
 | |
| 	defer p.hashPool.Put(hasher)
 | |
| 	hasher.Reset()
 | |
| 	hasher.Write(msg)
 | |
| 	digest := pssDigest{}
 | |
| 	key := hasher.Sum(nil)
 | |
| 	copy(digest[:], key[:digestLength])
 | |
| 	return digest
 | |
| }
 | |
| 
 | |
| func validateAddress(addr PssAddress) error {
 | |
| 	if len(addr) > addressLength {
 | |
| 		return errors.New("address too long")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |