p2p: move ping handling into pingLoop goroutine (#27887)
Moving the response sending there allows tracking all peer goroutines in the peer WaitGroup.
This commit is contained in:
parent
e13fa32cea
commit
7ec60d5f0c
15
p2p/peer.go
15
p2p/peer.go
@ -112,6 +112,7 @@ type Peer struct {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
protoErr chan error
|
protoErr chan error
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
pingRecv chan struct{}
|
||||||
disc chan DiscReason
|
disc chan DiscReason
|
||||||
|
|
||||||
// events receives message send / receive events if set
|
// events receives message send / receive events if set
|
||||||
@ -233,6 +234,7 @@ func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
|
|||||||
disc: make(chan DiscReason),
|
disc: make(chan DiscReason),
|
||||||
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
|
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
|
pingRecv: make(chan struct{}, 16),
|
||||||
log: log.New("id", conn.node.ID(), "conn", conn.flags),
|
log: log.New("id", conn.node.ID(), "conn", conn.flags),
|
||||||
}
|
}
|
||||||
return p
|
return p
|
||||||
@ -293,9 +295,11 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Peer) pingLoop() {
|
func (p *Peer) pingLoop() {
|
||||||
ping := time.NewTimer(pingInterval)
|
|
||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
|
ping := time.NewTimer(pingInterval)
|
||||||
defer ping.Stop()
|
defer ping.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ping.C:
|
case <-ping.C:
|
||||||
@ -304,6 +308,10 @@ func (p *Peer) pingLoop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
ping.Reset(pingInterval)
|
ping.Reset(pingInterval)
|
||||||
|
|
||||||
|
case <-p.pingRecv:
|
||||||
|
SendItems(p.rw, pongMsg)
|
||||||
|
|
||||||
case <-p.closed:
|
case <-p.closed:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -330,7 +338,10 @@ func (p *Peer) handle(msg Msg) error {
|
|||||||
switch {
|
switch {
|
||||||
case msg.Code == pingMsg:
|
case msg.Code == pingMsg:
|
||||||
msg.Discard()
|
msg.Discard()
|
||||||
go SendItems(p.rw, pongMsg)
|
select {
|
||||||
|
case p.pingRecv <- struct{}{}:
|
||||||
|
case <-p.closed:
|
||||||
|
}
|
||||||
case msg.Code == discMsg:
|
case msg.Code == discMsg:
|
||||||
// This is the last message. We don't need to discard or
|
// This is the last message. We don't need to discard or
|
||||||
// check errors because, the connection will be closed after it.
|
// check errors because, the connection will be closed after it.
|
||||||
|
Loading…
Reference in New Issue
Block a user