les: fix fetcher syncing logic (#18072)
This commit is contained in:
parent
bb29d20828
commit
f0515800e6
@ -141,36 +141,39 @@ func (f *lightFetcher) syncLoop() {
|
|||||||
s := requesting
|
s := requesting
|
||||||
requesting = false
|
requesting = false
|
||||||
var (
|
var (
|
||||||
rq *distReq
|
rq *distReq
|
||||||
reqID uint64
|
reqID uint64
|
||||||
|
syncing bool
|
||||||
)
|
)
|
||||||
if !f.syncing && !(newAnnounce && s) {
|
if !f.syncing && !(newAnnounce && s) {
|
||||||
rq, reqID = f.nextRequest()
|
rq, reqID, syncing = f.nextRequest()
|
||||||
}
|
}
|
||||||
syncing := f.syncing
|
|
||||||
f.lock.Unlock()
|
f.lock.Unlock()
|
||||||
|
|
||||||
if rq != nil {
|
if rq != nil {
|
||||||
requesting = true
|
requesting = true
|
||||||
_, ok := <-f.pm.reqDist.queue(rq)
|
if _, ok := <-f.pm.reqDist.queue(rq); ok {
|
||||||
if !ok {
|
if syncing {
|
||||||
|
f.lock.Lock()
|
||||||
|
f.syncing = true
|
||||||
|
f.lock.Unlock()
|
||||||
|
} else {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(softRequestTimeout)
|
||||||
|
f.reqMu.Lock()
|
||||||
|
req, ok := f.requested[reqID]
|
||||||
|
if ok {
|
||||||
|
req.timeout = true
|
||||||
|
f.requested[reqID] = req
|
||||||
|
}
|
||||||
|
f.reqMu.Unlock()
|
||||||
|
// keep starting new requests while possible
|
||||||
|
f.requestChn <- false
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
f.requestChn <- false
|
f.requestChn <- false
|
||||||
}
|
}
|
||||||
|
|
||||||
if !syncing {
|
|
||||||
go func() {
|
|
||||||
time.Sleep(softRequestTimeout)
|
|
||||||
f.reqMu.Lock()
|
|
||||||
req, ok := f.requested[reqID]
|
|
||||||
if ok {
|
|
||||||
req.timeout = true
|
|
||||||
f.requested[reqID] = req
|
|
||||||
}
|
|
||||||
f.reqMu.Unlock()
|
|
||||||
// keep starting new requests while possible
|
|
||||||
f.requestChn <- false
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
case reqID := <-f.timeoutChn:
|
case reqID := <-f.timeoutChn:
|
||||||
f.reqMu.Lock()
|
f.reqMu.Lock()
|
||||||
@ -209,6 +212,7 @@ func (f *lightFetcher) syncLoop() {
|
|||||||
f.checkSyncedHeaders(p)
|
f.checkSyncedHeaders(p)
|
||||||
f.syncing = false
|
f.syncing = false
|
||||||
f.lock.Unlock()
|
f.lock.Unlock()
|
||||||
|
f.requestChn <- false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
|
|||||||
|
|
||||||
// nextRequest selects the peer and announced head to be requested next, amount
|
// nextRequest selects the peer and announced head to be requested next, amount
|
||||||
// to be downloaded starting from the head backwards is also returned
|
// to be downloaded starting from the head backwards is also returned
|
||||||
func (f *lightFetcher) nextRequest() (*distReq, uint64) {
|
func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) {
|
||||||
var (
|
var (
|
||||||
bestHash common.Hash
|
bestHash common.Hash
|
||||||
bestAmount uint64
|
bestAmount uint64
|
||||||
@ -427,14 +431,12 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if bestTd == f.maxConfirmedTd {
|
if bestTd == f.maxConfirmedTd {
|
||||||
return nil, 0
|
return nil, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
f.syncing = bestSyncing
|
|
||||||
|
|
||||||
var rq *distReq
|
var rq *distReq
|
||||||
reqID := genReqID()
|
reqID := genReqID()
|
||||||
if f.syncing {
|
if bestSyncing {
|
||||||
rq = &distReq{
|
rq = &distReq{
|
||||||
getCost: func(dp distPeer) uint64 {
|
getCost: func(dp distPeer) uint64 {
|
||||||
return 0
|
return 0
|
||||||
@ -500,7 +502,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rq, reqID
|
return rq, reqID, bestSyncing
|
||||||
}
|
}
|
||||||
|
|
||||||
// deliverHeaders delivers header download request responses for processing
|
// deliverHeaders delivers header download request responses for processing
|
||||||
|
Loading…
Reference in New Issue
Block a user