better handling of initial sync
This commit is contained in:
parent
469666de82
commit
9ddf7bbd15
@ -6,6 +6,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -63,6 +64,8 @@ type syncManager struct {
|
||||
heads map[peer.ID]*types.TipSet
|
||||
recent *syncBuffer
|
||||
|
||||
initialSync bool
|
||||
|
||||
mx sync.Mutex
|
||||
state map[uint64]*workerState
|
||||
|
||||
@ -147,12 +150,19 @@ func (sm *syncManager) State() []SyncerStateSnapshot {
|
||||
|
||||
// sync manager internals
|
||||
func (sm *syncManager) scheduler() {
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
tickerC := ticker.C
|
||||
for {
|
||||
select {
|
||||
case head := <-sm.workq:
|
||||
sm.handlePeerHead(head)
|
||||
case status := <-sm.statusq:
|
||||
sm.handleWorkerStatus(status)
|
||||
case <-tickerC:
|
||||
if sm.initialSync {
|
||||
tickerC = nil
|
||||
sm.handleInitialSync()
|
||||
}
|
||||
case <-sm.ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -214,6 +224,8 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) {
|
||||
} else {
|
||||
// add to the recently synced buffer
|
||||
sm.recent.Push(ws.ts)
|
||||
// mark the end of the initial sync
|
||||
sm.initialSync = true
|
||||
}
|
||||
|
||||
// we are done with this target, select the next sync target and spawn a worker if there is work
|
||||
@ -230,6 +242,24 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) handleInitialSync() {
|
||||
// we have just finished the initial sync; spawn some additional workers in deferred syncs
|
||||
// as needed (and up to MaxSyncWorkers) to ramp up chain sync
|
||||
for len(sm.state) < MaxSyncWorkers {
|
||||
target, work, err := sm.selectDeferredSyncTarget()
|
||||
if err != nil {
|
||||
log.Errorf("error selecting deferred sync target: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !work {
|
||||
return
|
||||
}
|
||||
|
||||
sm.spawnWorker(target)
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) spawnWorker(target *types.TipSet) {
|
||||
id := sm.nextWorker
|
||||
sm.nextWorker++
|
||||
@ -343,10 +373,10 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// if we have too many workers, add it to the deferred queue; it will be processed once a worker
|
||||
// is freed from syncing a chain
|
||||
if len(sm.state) >= MaxSyncWorkers {
|
||||
log.Infof("too many sync workers; deferring sync on %s", ts)
|
||||
// if we have not finished the initial sync or have too many workers, add it to the deferred queue;
|
||||
// it will be processed once a worker is freed from syncing a chain (or the initial sync finishes)
|
||||
if !sm.initialSync || len(sm.state) >= MaxSyncWorkers {
|
||||
log.Infof("deferring sync on %s", ts)
|
||||
sm.deferred.Insert(ts)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user