whisper: use async handshakes to handle blocking peers

This commit is contained in:
Péter Szilágyi 2015-04-15 13:01:22 +03:00
parent 46ea193a49
commit 6ceb253f74
3 changed files with 12 additions and 47 deletions

View File

@ -1,40 +0,0 @@
// Contains some common utility functions for testing.
package whisper
import (
"bytes"
"io/ioutil"
"github.com/ethereum/go-ethereum/p2p"
)
// bufMsgPipe creates a buffered message pipe between two endpoints.
func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) {
A, midA := p2p.MsgPipe()
midB, B := p2p.MsgPipe()
go copyMsgPipe(midA, midB)
go copyMsgPipe(midB, midA)
return A, B
}
// copyMsgPipe copies messages from the src pipe to the dest.
func copyMsgPipe(dst, src *p2p.MsgPipeRW) {
defer dst.Close()
for {
msg, err := src.ReadMsg()
if err != nil {
return
}
data, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return
}
msg.Payload = bytes.NewReader(data)
if err := dst.WriteMsg(msg); err != nil {
return
}
}
}

View File

@ -53,10 +53,12 @@ func (self *peer) stop() {
// handshake sends the protocol initiation status message to the remote peer and // handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too. // verifies the remote status too.
func (self *peer) handshake() error { func (self *peer) handshake() error {
// Send own status message, fetch remote one // Send the handshake status message asynchronously
if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { errc := make(chan error, 1)
return err go func() {
} errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
}()
// Fetch the remote status packet and verify protocol match
packet, err := self.ws.ReadMsg() packet, err := self.ws.ReadMsg()
if err != nil { if err != nil {
return err return err
@ -64,7 +66,6 @@ func (self *peer) handshake() error {
if packet.Code != statusCode { if packet.Code != statusCode {
return fmt.Errorf("peer sent %x before status packet", packet.Code) return fmt.Errorf("peer sent %x before status packet", packet.Code)
} }
// Decode the rest of the status packet and verify protocol match
s := rlp.NewStream(packet.Payload) s := rlp.NewStream(packet.Payload)
if _, err := s.List(); err != nil { if _, err := s.List(); err != nil {
return fmt.Errorf("bad status message: %v", err) return fmt.Errorf("bad status message: %v", err)
@ -76,7 +77,11 @@ func (self *peer) handshake() error {
if peerVersion != protocolVersion { if peerVersion != protocolVersion {
return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
} }
return packet.Discard() // ignore anything after protocol version // Wait until out own status is consumed too
if err := <-errc; err != nil {
return fmt.Errorf("failed to send status packet: %v", err)
}
return nil
} }
// update executes periodic operations on the peer, including message transmission // update executes periodic operations on the peer, including message transmission

View File

@ -21,7 +21,7 @@ func startTestCluster(n int) []*Whisper {
} }
// Wire all the peers to the root one // Wire all the peers to the root one
for i := 1; i < n; i++ { for i := 1; i < n; i++ {
src, dst := bufMsgPipe() src, dst := p2p.MsgPipe()
go whispers[0].handlePeer(nodes[i], src) go whispers[0].handlePeer(nodes[i], src)
go whispers[i].handlePeer(nodes[0], dst) go whispers[i].handlePeer(nodes[0], dst)