les: drop the message if the entire p2p connection is stuck (#21033)
* les: drop the message if the entire p2p connection is stuck * les: fix lint
This commit is contained in:
parent
7ace5a3a8b
commit
53cac027d0
23
les/peer.go
23
les/peer.go
@ -65,9 +65,6 @@ const (
|
|||||||
|
|
||||||
// handshakeTimeout is the timeout LES handshake will be treated as failed.
|
// handshakeTimeout is the timeout LES handshake will be treated as failed.
|
||||||
handshakeTimeout = 5 * time.Second
|
handshakeTimeout = 5 * time.Second
|
||||||
|
|
||||||
// retrySendCachePeriod is the time interval a caching retry is performed.
|
|
||||||
retrySendCachePeriod = time.Millisecond * 100
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -164,24 +161,6 @@ func (p *peerCommons) queueSend(f func()) bool {
|
|||||||
return p.sendQueue.Queue(f)
|
return p.sendQueue.Queue(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustQueueSend starts a for loop and retry the caching if failed.
|
|
||||||
// If the stopCh is closed, then it returns.
|
|
||||||
func (p *peerCommons) mustQueueSend(f func()) {
|
|
||||||
for {
|
|
||||||
// Check whether the stopCh is closed.
|
|
||||||
select {
|
|
||||||
case <-p.closeCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
// If the function is successfully cached, return.
|
|
||||||
if p.canQueue() && p.queueSend(f) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(retrySendCachePeriod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// String implements fmt.Stringer.
|
// String implements fmt.Stringer.
|
||||||
func (p *peerCommons) String() string {
|
func (p *peerCommons) String() string {
|
||||||
return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("les/%d", p.version))
|
return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("les/%d", p.version))
|
||||||
@ -899,7 +878,7 @@ func (p *clientPeer) updateCapacity(cap uint64) {
|
|||||||
var kvList keyValueList
|
var kvList keyValueList
|
||||||
kvList = kvList.add("flowControl/MRR", cap)
|
kvList = kvList.add("flowControl/MRR", cap)
|
||||||
kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
|
kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
|
||||||
p.mustQueueSend(func() { p.sendAnnounce(announceData{Update: kvList}) })
|
p.queueSend(func() { p.sendAnnounce(announceData{Update: kvList}) })
|
||||||
}
|
}
|
||||||
|
|
||||||
// freezeClient temporarily puts the client in a frozen state which means all
|
// freezeClient temporarily puts the client in a frozen state which means all
|
||||||
|
@ -262,7 +262,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
|
|||||||
h.server.clientPool.requestCost(p, realCost)
|
h.server.clientPool.requestCost(p, realCost)
|
||||||
}
|
}
|
||||||
if reply != nil {
|
if reply != nil {
|
||||||
p.mustQueueSend(func() {
|
p.queueSend(func() {
|
||||||
if err := reply.send(bv); err != nil {
|
if err := reply.send(bv); err != nil {
|
||||||
select {
|
select {
|
||||||
case p.errCh <- err:
|
case p.errCh <- err:
|
||||||
|
Loading…
Reference in New Issue
Block a user