129 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			129 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package whisper
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"io/ioutil"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/p2p"
 | 
						|
	"gopkg.in/fatih/set.v0"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	protocolVersion = 0x02
 | 
						|
)
 | 
						|
 | 
						|
type peer struct {
 | 
						|
	host *Whisper
 | 
						|
	peer *p2p.Peer
 | 
						|
	ws   p2p.MsgReadWriter
 | 
						|
 | 
						|
	// XXX Eventually this is going to reach exceptional large space. We need an expiry here
 | 
						|
	known *set.Set
 | 
						|
 | 
						|
	quit chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
 | 
						|
	return &peer{host, p, ws, set.New(), make(chan struct{})}
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) init() error {
 | 
						|
	if err := self.handleStatus(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) start() {
 | 
						|
	go self.update()
 | 
						|
	self.peer.Infoln("whisper started")
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) stop() {
 | 
						|
	self.peer.Infoln("whisper stopped")
 | 
						|
 | 
						|
	close(self.quit)
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) update() {
 | 
						|
	relay := time.NewTicker(300 * time.Millisecond)
 | 
						|
out:
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-relay.C:
 | 
						|
			err := self.broadcast(self.host.envelopes())
 | 
						|
			if err != nil {
 | 
						|
				self.peer.Infoln("broadcast err:", err)
 | 
						|
				break out
 | 
						|
			}
 | 
						|
 | 
						|
		case <-self.quit:
 | 
						|
			break out
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) broadcast(envelopes []*Envelope) error {
 | 
						|
	envs := make([]interface{}, len(envelopes))
 | 
						|
	i := 0
 | 
						|
	for _, envelope := range envelopes {
 | 
						|
		if !self.known.Has(envelope.Hash()) {
 | 
						|
			envs[i] = envelope
 | 
						|
			self.known.Add(envelope.Hash())
 | 
						|
			i++
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if i > 0 {
 | 
						|
		msg := p2p.NewMsg(envelopesMsg, envs[:i]...)
 | 
						|
		if err := self.ws.WriteMsg(msg); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		self.peer.Infoln("broadcasted", i, "message(s)")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) addKnown(envelope *Envelope) {
 | 
						|
	self.known.Add(envelope.Hash())
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) handleStatus() error {
 | 
						|
	ws := self.ws
 | 
						|
 | 
						|
	if err := ws.WriteMsg(self.statusMsg()); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	msg, err := ws.ReadMsg()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if msg.Code != statusMsg {
 | 
						|
		return fmt.Errorf("peer send %x before status msg", msg.Code)
 | 
						|
	}
 | 
						|
 | 
						|
	data, err := ioutil.ReadAll(msg.Payload)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if len(data) == 0 {
 | 
						|
		return fmt.Errorf("malformed status. data len = 0")
 | 
						|
	}
 | 
						|
 | 
						|
	if pv := data[0]; pv != protocolVersion {
 | 
						|
		return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (self *peer) statusMsg() p2p.Msg {
 | 
						|
	return p2p.NewMsg(statusMsg, protocolVersion)
 | 
						|
}
 |