221 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			221 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package p2p
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	handlerTimeout = 1000
 | |
| )
 | |
| 
 | |
| type Handlers map[string](func(p *Peer) Protocol)
 | |
| 
 | |
| type Messenger struct {
 | |
| 	conn          *Connection
 | |
| 	peer          *Peer
 | |
| 	handlers      Handlers
 | |
| 	protocolLock  sync.RWMutex
 | |
| 	protocols     []Protocol
 | |
| 	offsets       []MsgCode // offsets for adaptive message idss
 | |
| 	protocolTable map[string]int
 | |
| 	quit          chan chan bool
 | |
| 	err           chan *PeerError
 | |
| 	pulse         chan bool
 | |
| }
 | |
| 
 | |
| func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
 | |
| 	baseProtocol := NewBaseProtocol(peer)
 | |
| 	return &Messenger{
 | |
| 		conn:          conn,
 | |
| 		peer:          peer,
 | |
| 		offsets:       []MsgCode{baseProtocol.Offset()},
 | |
| 		handlers:      handlers,
 | |
| 		protocols:     []Protocol{baseProtocol},
 | |
| 		protocolTable: make(map[string]int),
 | |
| 		err:           errchan,
 | |
| 		pulse:         make(chan bool, 1),
 | |
| 		quit:          make(chan chan bool, 1),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Messenger) Start() {
 | |
| 	self.conn.Open()
 | |
| 	go self.messenger()
 | |
| 	self.protocolLock.RLock()
 | |
| 	defer self.protocolLock.RUnlock()
 | |
| 	self.protocols[0].Start()
 | |
| }
 | |
| 
 | |
| func (self *Messenger) Stop() {
 | |
| 	// close pulse to stop ping pong monitoring
 | |
| 	close(self.pulse)
 | |
| 	self.protocolLock.RLock()
 | |
| 	defer self.protocolLock.RUnlock()
 | |
| 	for _, protocol := range self.protocols {
 | |
| 		protocol.Stop() // could be parallel
 | |
| 	}
 | |
| 	q := make(chan bool)
 | |
| 	self.quit <- q
 | |
| 	<-q
 | |
| 	self.conn.Close()
 | |
| }
 | |
| 
 | |
| func (self *Messenger) messenger() {
 | |
| 	in := self.conn.Read()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case payload, ok := <-in:
 | |
| 			//dispatches message to the protocol asynchronously
 | |
| 			if ok {
 | |
| 				go self.handle(payload)
 | |
| 			} else {
 | |
| 				return
 | |
| 			}
 | |
| 		case q := <-self.quit:
 | |
| 			q <- true
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // handles each message by dispatching to the appropriate protocol
 | |
| // using adaptive message codes
 | |
| // this function is started as a separate go routine for each message
 | |
| // it waits for the protocol response
 | |
| // then encodes and sends outgoing messages to the connection's write channel
 | |
| func (self *Messenger) handle(payload []byte) {
 | |
| 	// send ping to heartbeat channel signalling time of last message
 | |
| 	// select {
 | |
| 	// case self.pulse <- true:
 | |
| 	// default:
 | |
| 	// }
 | |
| 	self.pulse <- true
 | |
| 	// initialise message from payload
 | |
| 	msg, err := NewMsgFromBytes(payload)
 | |
| 	if err != nil {
 | |
| 		self.err <- NewPeerError(MiscError, " %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	// retrieves protocol based on message Code
 | |
| 	protocol, offset, peerErr := self.getProtocol(msg.Code())
 | |
| 	if err != nil {
 | |
| 		self.err <- peerErr
 | |
| 		return
 | |
| 	}
 | |
| 	// reset message code based on adaptive offset
 | |
| 	msg.Decode(offset)
 | |
| 	// dispatches
 | |
| 	response := make(chan *Msg)
 | |
| 	go protocol.HandleIn(msg, response)
 | |
| 	// protocol reponse timeout to prevent leaks
 | |
| 	timer := time.After(handlerTimeout * time.Millisecond)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case outgoing, ok := <-response:
 | |
| 			// we check if response channel is not closed
 | |
| 			if ok {
 | |
| 				self.conn.Write() <- outgoing.Encode(offset)
 | |
| 			} else {
 | |
| 				return
 | |
| 			}
 | |
| 		case <-timer:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // negotiated protocols
 | |
| // stores offsets needed for adaptive message id scheme
 | |
| 
 | |
| // based on offsets set at handshake
 | |
| // get the right protocol to handle the message
 | |
| func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
 | |
| 	self.protocolLock.RLock()
 | |
| 	defer self.protocolLock.RUnlock()
 | |
| 	base := MsgCode(0)
 | |
| 	for index, offset := range self.offsets {
 | |
| 		if code < offset {
 | |
| 			return self.protocols[index], base, nil
 | |
| 		}
 | |
| 		base = offset
 | |
| 	}
 | |
| 	return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
 | |
| }
 | |
| 
 | |
| func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
 | |
| 	fmt.Printf("pingpong keepalive started at %v", time.Now())
 | |
| 
 | |
| 	timer := time.After(timeout)
 | |
| 	pinged := false
 | |
| 	for {
 | |
| 		select {
 | |
| 		case _, ok := <-self.pulse:
 | |
| 			if ok {
 | |
| 				pinged = false
 | |
| 				timer = time.After(timeout)
 | |
| 			} else {
 | |
| 				// pulse is closed, stop monitoring
 | |
| 				return
 | |
| 			}
 | |
| 		case <-timer:
 | |
| 			if pinged {
 | |
| 				fmt.Printf("timeout at %v", time.Now())
 | |
| 				timeoutCallback()
 | |
| 				return
 | |
| 			} else {
 | |
| 				fmt.Printf("pinged at %v", time.Now())
 | |
| 				pingCallback()
 | |
| 				timer = time.After(gracePeriod)
 | |
| 				pinged = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Messenger) AddProtocols(protocols []string) {
 | |
| 	self.protocolLock.Lock()
 | |
| 	defer self.protocolLock.Unlock()
 | |
| 	i := len(self.offsets)
 | |
| 	offset := self.offsets[i-1]
 | |
| 	for _, name := range protocols {
 | |
| 		protocolFunc, ok := self.handlers[name]
 | |
| 		if ok {
 | |
| 			protocol := protocolFunc(self.peer)
 | |
| 			self.protocolTable[name] = i
 | |
| 			i++
 | |
| 			offset += protocol.Offset()
 | |
| 			fmt.Println("offset ", name, offset)
 | |
| 
 | |
| 			self.offsets = append(self.offsets, offset)
 | |
| 			self.protocols = append(self.protocols, protocol)
 | |
| 			protocol.Start()
 | |
| 		} else {
 | |
| 			fmt.Println("no ", name)
 | |
| 			// protocol not handled
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (self *Messenger) Write(protocol string, msg *Msg) error {
 | |
| 	self.protocolLock.RLock()
 | |
| 	defer self.protocolLock.RUnlock()
 | |
| 	i := 0
 | |
| 	offset := MsgCode(0)
 | |
| 	if len(protocol) > 0 {
 | |
| 		var ok bool
 | |
| 		i, ok = self.protocolTable[protocol]
 | |
| 		if !ok {
 | |
| 			return fmt.Errorf("protocol %v not handled by peer", protocol)
 | |
| 		}
 | |
| 		offset = self.offsets[i-1]
 | |
| 	}
 | |
| 	handler := self.protocols[i]
 | |
| 	// checking if protocol status/caps allows the message to be sent out
 | |
| 	if handler.HandleOut(msg) {
 | |
| 		self.conn.Write() <- msg.Encode(offset)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |