limit max active sync workers

This commit is contained in:
vyzo 2020-10-28 10:33:22 +02:00
parent fc1ac3e752
commit 469666de82

View File

@ -14,11 +14,14 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)
var BootstrapPeerThreshold = 1
var (
BootstrapPeerThreshold = 1
var RecentSyncBufferSize = 10
RecentSyncBufferSize = 10
MaxSyncWorkers = 5
var coalesceTipsets = false
coalesceTipsets = false
)
func init() {
coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes"
@ -56,6 +59,7 @@ type syncManager struct {
nextWorker uint64
pend syncBucketSet
deferred syncBucketSet
heads map[peer.ID]*types.TipSet
recent *syncBuffer
@ -339,6 +343,13 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err
return nil, false, nil
}
// if we have too many workers, add it to the deferred queue; it will be processed once a worker
// is freed from syncing a chain
if len(sm.state) >= MaxSyncWorkers {
log.Infof("too many sync workers; deferring sync on %s", ts)
sm.deferred.Insert(ts)
}
// start a new worker, seems heavy enough and unrelated to active or pending syncs
return ts, true, nil
}
@ -350,23 +361,58 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool
// if we are not already working on a heavier tipset
related := sm.pend.PopRelated(done)
if related == nil {
return nil, false, nil
return sm.selectDeferredSyncTarget()
}
heaviest := related.heaviestTipSet()
if isHeavier(done, heaviest) {
return nil, false, nil
return sm.selectDeferredSyncTarget()
}
for _, ws := range sm.state {
if isHeavier(ws.ts, heaviest) {
return nil, false, nil
return sm.selectDeferredSyncTarget()
}
}
return heaviest, true, nil
}
// selects a deferred sync target if there is any; these are sync targets that were not related to
// active syncs and were deferred because there were too many workers running
func (sm *syncManager) selectDeferredSyncTarget() (*types.TipSet, bool, error) {
deferredLoop:
for !sm.deferred.Empty() {
bucket := sm.deferred.Pop()
heaviest := bucket.heaviestTipSet()
if sm.pend.RelatedToAny(heaviest) {
// this has converged to a pending sync, insert it to the pending queue
sm.pend.Insert(heaviest)
continue deferredLoop
}
for _, ws := range sm.state {
if ws.ts.Equals(heaviest) || isHeavier(ws.ts, heaviest) {
// we have converged and are already syncing it or we are syncing on something heavier
// ignore it and pop the next deferred bucket
continue deferredLoop
}
if heaviest.Parents() == ws.ts.Key() {
// we have converged and we are syncing its parent; insert it to the pending queue
sm.pend.Insert(heaviest)
continue deferredLoop
}
// it's not related to any active or pending sync and this worker is free, so sync it!
return heaviest, true, nil
}
}
return nil, false, nil
}
func isHeavier(a, b *types.TipSet) bool {
return a.ParentWeight().GreaterThan(b.ParentWeight())
}