les: avoid fetcher deadlock on requestChn (#19571)

* les: avoid fetcher deadlock on requestChn
This commit is contained in:
Felföldi Zsolt 2019-05-17 20:39:39 +02:00 committed by GitHub
parent e687d063c3
commit 97d3615612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -55,7 +55,8 @@ type lightFetcher struct {
requested map[uint64]fetchRequest requested map[uint64]fetchRequest
deliverChn chan fetchResponse deliverChn chan fetchResponse
timeoutChn chan uint64 timeoutChn chan uint64
requestChn chan bool // true if initiated from outside requestTriggered bool
requestTrigger chan struct{}
lastTrustedHeader *types.Header lastTrustedHeader *types.Header
} }
@ -122,7 +123,7 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher {
deliverChn: make(chan fetchResponse, 100), deliverChn: make(chan fetchResponse, 100),
requested: make(map[uint64]fetchRequest), requested: make(map[uint64]fetchRequest),
timeoutChn: make(chan uint64), timeoutChn: make(chan uint64),
requestChn: make(chan bool, 100), requestTrigger: make(chan struct{}, 1),
syncDone: make(chan *peer), syncDone: make(chan *peer),
maxConfirmedTd: big.NewInt(0), maxConfirmedTd: big.NewInt(0),
} }
@ -135,31 +136,26 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher {
// syncLoop is the main event loop of the light fetcher // syncLoop is the main event loop of the light fetcher
func (f *lightFetcher) syncLoop() { func (f *lightFetcher) syncLoop() {
requesting := false
defer f.pm.wg.Done() defer f.pm.wg.Done()
for { for {
select { select {
case <-f.pm.quitSync: case <-f.pm.quitSync:
return return
// when a new announce is received, request loop keeps running until // request loop keeps running until no further requests are necessary or possible
// no further requests are necessary or possible case <-f.requestTrigger:
case newAnnounce := <-f.requestChn:
f.lock.Lock() f.lock.Lock()
s := requesting
requesting = false
var ( var (
rq *distReq rq *distReq
reqID uint64 reqID uint64
syncing bool syncing bool
) )
if !f.syncing {
if !f.syncing && !(newAnnounce && s) {
rq, reqID, syncing = f.nextRequest() rq, reqID, syncing = f.nextRequest()
} }
f.requestTriggered = rq != nil
f.lock.Unlock() f.lock.Unlock()
if rq != nil { if rq != nil {
requesting = true
if _, ok := <-f.pm.reqDist.queue(rq); ok { if _, ok := <-f.pm.reqDist.queue(rq); ok {
if syncing { if syncing {
f.lock.Lock() f.lock.Lock()
@ -176,11 +172,11 @@ func (f *lightFetcher) syncLoop() {
} }
f.reqMu.Unlock() f.reqMu.Unlock()
// keep starting new requests while possible // keep starting new requests while possible
f.requestChn <- false f.requestTrigger <- struct{}{}
}() }()
} }
} else { } else {
f.requestChn <- false f.requestTrigger <- struct{}{}
} }
} }
case reqID := <-f.timeoutChn: case reqID := <-f.timeoutChn:
@ -220,7 +216,7 @@ func (f *lightFetcher) syncLoop() {
f.checkSyncedHeaders(p) f.checkSyncedHeaders(p)
f.syncing = false f.syncing = false
f.lock.Unlock() f.lock.Unlock()
f.requestChn <- false f.requestTrigger <- struct{}{} // f.requestTriggered is always true here
} }
} }
} }
@ -354,7 +350,10 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
fp.lastAnnounced = n fp.lastAnnounced = n
p.lock.Unlock() p.lock.Unlock()
f.checkUpdateStats(p, nil) f.checkUpdateStats(p, nil)
f.requestChn <- true if !f.requestTriggered {
f.requestTriggered = true
f.requestTrigger <- struct{}{}
}
} }
// peerHasBlock returns true if we can assume the peer knows the given block // peerHasBlock returns true if we can assume the peer knows the given block