diff --git a/p2p/message.go b/p2p/message.go index 5dc7d5460..ef3630a90 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -185,7 +185,10 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { case p.w <- msg: if msg.Size > 0 { // wait for payload read or discard - <-consumed + select { + case <-consumed: + case <-p.closing: + } } return nil case <-p.closing: @@ -207,8 +210,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) { } // Close unblocks any pending ReadMsg and WriteMsg calls on both ends -// of the pipe. They will return ErrPipeClosed. Note that Close does -// not interrupt any reads from a message payload. +// of the pipe. They will return ErrPipeClosed. Close also +// interrupts any reads from a message payload. func (p *MsgPipeRW) Close() error { if atomic.AddInt32(p.closed, 1) != 1 { // someone else is already closing