whisper: fix a small data race duirng peer connection
This commit is contained in:
parent
70ded4cbf0
commit
406e74e2af
@ -23,18 +23,14 @@ type peer struct {
|
|||||||
|
|
||||||
// newPeer creates and initializes a new whisper peer connection, returning either
|
// newPeer creates and initializes a new whisper peer connection, returning either
|
||||||
// the newly constructed link or a failure reason.
|
// the newly constructed link or a failure reason.
|
||||||
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) {
|
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer {
|
||||||
p := &peer{
|
return &peer{
|
||||||
host: host,
|
host: host,
|
||||||
peer: remote,
|
peer: remote,
|
||||||
ws: rw,
|
ws: rw,
|
||||||
known: set.New(),
|
known: set.New(),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if err := p.handshake(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return p, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// start initiates the peer updater, periodically broadcasting the whisper packets
|
// start initiates the peer updater, periodically broadcasting the whisper packets
|
||||||
|
@ -168,15 +168,9 @@ func (self *Whisper) Messages(id int) []*Message {
|
|||||||
// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
|
// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
|
||||||
// connection is negotiated.
|
// connection is negotiated.
|
||||||
func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||||
// Create, initialize and start the whisper peer
|
// Create the new peer and start tracking it
|
||||||
whisperPeer, err := newPeer(self, peer, rw)
|
whisperPeer := newPeer(self, peer, rw)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
whisperPeer.start()
|
|
||||||
defer whisperPeer.stop()
|
|
||||||
|
|
||||||
// Start tracking the active peer
|
|
||||||
self.peerMu.Lock()
|
self.peerMu.Lock()
|
||||||
self.peers[whisperPeer] = struct{}{}
|
self.peers[whisperPeer] = struct{}{}
|
||||||
self.peerMu.Unlock()
|
self.peerMu.Unlock()
|
||||||
@ -186,6 +180,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
|
|||||||
delete(self.peers, whisperPeer)
|
delete(self.peers, whisperPeer)
|
||||||
self.peerMu.Unlock()
|
self.peerMu.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Run the peer handshake and state updates
|
||||||
|
if err := whisperPeer.handshake(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
whisperPeer.start()
|
||||||
|
defer whisperPeer.stop()
|
||||||
|
|
||||||
// Read and process inbound messages directly to merge into client-global state
|
// Read and process inbound messages directly to merge into client-global state
|
||||||
for {
|
for {
|
||||||
// Fetch the next packet and decode the contained envelopes
|
// Fetch the next packet and decode the contained envelopes
|
||||||
|
Loading…
Reference in New Issue
Block a user