From 5d34b7d618a0edb3bedf2d98bbb467cb454eb2e7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 14:11:19 +0200 Subject: [PATCH] rewrite sync manager --- chain/sync_manager.go | 563 +++++++++++++++++++------------------ chain/sync_manager_test.go | 11 +- 2 files changed, 298 insertions(+), 276 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c25068f60..8979c1e40 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -7,11 +7,13 @@ import ( "strings" "sync" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + peer "github.com/libp2p/go-libp2p-core/peer" ) -const BootstrapPeerThreshold = 2 +var BootstrapPeerThreshold = 2 var coalesceForksParents = false @@ -21,13 +23,6 @@ func init() { } } -const ( - BSStateInit = 0 - BSStateSelected = 1 - BSStateScheduled = 2 - BSStateComplete = 3 -) - type SyncFunc func(context.Context, *types.TipSet) error // SyncManager manages the chain synchronization process, both at bootstrap time @@ -52,108 +47,327 @@ type SyncManager interface { } type syncManager struct { - lk sync.Mutex - peerHeads map[peer.ID]*types.TipSet + ctx context.Context + cancel func() - bssLk sync.Mutex - bootstrapState int + workq chan peerHead + statusq chan workerStatus - bspThresh int + nextWorker uint64 + pend syncBucketSet + heads map[peer.ID]*types.TipSet - incomingTipSets chan *types.TipSet - syncTargets chan *types.TipSet - syncResults chan *syncResult + mx sync.Mutex + state map[uint64]*workerState - syncStates []*SyncerState - - // Normally this handler is set to `(*Syncer).Sync()`. doSync func(context.Context, *types.TipSet) error - - stop chan struct{} - - // Sync Scheduler fields - activeSyncs map[types.TipSetKey]*types.TipSet - syncQueue syncBucketSet - activeSyncTips syncBucketSet - nextSyncTarget *syncTargetBucket - workerChan chan *types.TipSet } var _ SyncManager = (*syncManager)(nil) -type syncResult struct { - ts *types.TipSet - success bool +type peerHead struct { + p peer.ID + ts *types.TipSet } -const syncWorkerCount = 3 +type workerState struct { + id uint64 + ts *types.TipSet + ss *SyncerState +} +type workerStatus struct { + id uint64 + err error +} + +// sync manager interface func NewSyncManager(sync SyncFunc) SyncManager { - sm := &syncManager{ - bspThresh: 1, - peerHeads: make(map[peer.ID]*types.TipSet), - syncTargets: make(chan *types.TipSet), - syncResults: make(chan *syncResult), - syncStates: make([]*SyncerState, syncWorkerCount), - incomingTipSets: make(chan *types.TipSet), - activeSyncs: make(map[types.TipSetKey]*types.TipSet), - doSync: sync, - stop: make(chan struct{}), + ctx, cancel := context.WithCancel(context.Background()) + return &syncManager{ + ctx: ctx, + cancel: cancel, + + workq: make(chan peerHead), + statusq: make(chan workerStatus), + + heads: make(map[peer.ID]*types.TipSet), + state: make(map[uint64]*workerState), + + doSync: sync, } - for i := range sm.syncStates { - sm.syncStates[i] = new(SyncerState) - } - return sm } func (sm *syncManager) Start() { - go sm.syncScheduler() - for i := 0; i < syncWorkerCount; i++ { - go sm.syncWorker(i) - } + go sm.scheduler() } func (sm *syncManager) Stop() { - close(sm.stop) + select { + case <-sm.ctx.Done(): + default: + sm.cancel() + } } func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { - sm.lk.Lock() - defer sm.lk.Unlock() - sm.peerHeads[p] = ts - - if sm.getBootstrapState() == BSStateInit { - spc := sm.syncedPeerCount() - if spc >= sm.bspThresh { - // Its go time! - target, err := sm.selectSyncTarget() - if err != nil { - log.Error("failed to select sync target: ", err) - return - } - sm.setBootstrapState(BSStateSelected) - - sm.incomingTipSets <- target - } - log.Infof("sync bootstrap has %d peers", spc) - return + select { + case sm.workq <- peerHead{p: p, ts: ts}: + case <-sm.ctx.Done(): + case <-ctx.Done(): } - - sm.incomingTipSets <- ts } func (sm *syncManager) State() []SyncerStateSnapshot { - ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates)) - for _, s := range sm.syncStates { - ret = append(ret, s.Snapshot()) + sm.mx.Lock() + workerStates := make([]*workerState, 0, len(sm.state)) + for _, ws := range sm.state { + workerStates = append(workerStates, ws) } - return ret + sm.mx.Unlock() + + sort.Slice(workerStates, func(i, j int) bool { + return workerStates[i].id < workerStates[j].id + }) + + result := make([]SyncerStateSnapshot, 0, len(workerStates)) + for _, ws := range workerStates { + result = append(result, ws.ss.Snapshot()) + } + + return result } +// sync manager internals +func (sm *syncManager) scheduler() { + for { + select { + case head := <-sm.workq: + sm.handlePeerHead(head) + case status := <-sm.statusq: + sm.handleWorkerStatus(status) + case <-sm.ctx.Done(): + return + } + } +} + +func (sm *syncManager) handlePeerHead(head peerHead) { + log.Infof("new peer head: %s %s", head.p, head.ts) + + // have we started syncing yet? + if sm.nextWorker == 0 { + // track the peer head until we start syncing + sm.heads[head.p] = head.ts + + // not yet; do we have enough peers? + if len(sm.heads) < BootstrapPeerThreshold { + // not enough peers; track it and wait + return + } + + // we are ready to start syncing; select the sync target and spawn a worker + target, err := sm.selectInitialSyncTarget() + if err != nil { + log.Errorf("failed to select initial sync target: %s", err) + return + } + + log.Infof("selected initial sync target: %s", target) + sm.spawnWorker(target) + return + } + + // we have started syncing, add peer head to the queue if applicable and maybe spawn a worker + // if there is work to do (possibly in a fork) + target, work, err := sm.addSyncTarget(head.ts) + if err != nil { + log.Warnf("failed to add sync target: %s", err) + return + } + + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) handleWorkerStatus(status workerStatus) { + log.Debugf("worker %d done; status error: %s", status.err) + + sm.mx.Lock() + ws := sm.state[status.id] + delete(sm.state, status.id) + sm.mx.Unlock() + + if status.err != nil { + // we failed to sync this target -- log it and try to work on an extended chain + // if there is nothing related to be worked on, we stop working on this chain. + log.Errorf("error during sync in %s: %s", ws.ts, status.err) + } + + // we are done with this target, select the next sync target and spawn a worker if there is work + // to do, because of an extension of this chain. + target, work, err := sm.selectSyncTarget(ws.ts) + if err != nil { + log.Warnf("failed to select sync target: %s", err) + return + } + + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) spawnWorker(target *types.TipSet) { + id := sm.nextWorker + sm.nextWorker++ + ws := &workerState{ + id: id, + ts: target, + ss: new(SyncerState), + } + + sm.mx.Lock() + sm.state[id] = ws + sm.mx.Unlock() + + go sm.worker(ws) +} + +func (sm *syncManager) worker(ws *workerState) { + log.Infof("worker %d syncing in %s", ws.id, ws.ss) + + start := build.Clock.Now() + defer func() { + log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start)) + }() + + ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss) + err := sm.doSync(ctx, ws.ts) + + select { + case sm.statusq <- workerStatus{id: ws.id, err: err}: + case <-sm.ctx.Done(): + } +} + +// selects the initial sync target by examining known peer heads; only called once for the initial +// sync. +func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) { + var buckets syncBucketSet + + var peerHeads []*types.TipSet + for _, ts := range sm.heads { + peerHeads = append(peerHeads, ts) + } + // clear the map, we don't use it any longer + sm.heads = nil + + sort.Slice(peerHeads, func(i, j int) bool { + return peerHeads[i].Height() < peerHeads[j].Height() + }) + + for _, ts := range peerHeads { + buckets.Insert(ts) + } + + if len(buckets.buckets) > 1 { + log.Warn("caution, multiple distinct chains seen during head selections") + // TODO: we *could* refuse to sync here without user intervention. + // For now, just select the best cluster + } + + return buckets.Heaviest(), nil +} + +// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on. +// this could be either a restart, eg because there is no currently scheduled sync work or a worker +// failed or a potential fork. +func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) { + // Note: we don't need the state lock here to access the active worker states, as the only + // competing threads that may access it do so through State() which is read only. + + // if the worker set is empty, we have finished syncing and were waiting for the next tipset + // in this case, we just return the tipset as work to be done + if len(sm.state) == 0 { + return ts, true, nil + } + + // check if it is related to any active sync; if so insert into the pending sync queue + for _, ws := range sm.state { + if ts.Equals(ws.ts) { + // ignore it, we are already syncing it + return nil, false, nil + } + + if ts.Parents() == ws.ts.Key() { + // schedule for syncing next; it's an extension of an active sync + sm.pend.Insert(ts) + return nil, false, nil + } + } + + // check to see if it is related to any pending sync; if so insert it into the pending sync queue + if sm.pend.RelatedToAny(ts) { + sm.pend.Insert(ts) + return nil, false, nil + } + + // it's not related to any active or pending sync; this could be a fork in which case we + // start a new worker to sync it, if it is *heavier* than any active or pending set; + // if it is not, we ignore it. + activeHeavier := false + for _, ws := range sm.state { + if ws.ts.Height() > ts.Height() { + activeHeavier = true + break + } + } + + if activeHeavier { + return nil, false, nil + } + + pendHeaviest := sm.pend.Heaviest() + if pendHeaviest != nil && pendHeaviest.Height() > ts.Height() { + return nil, false, nil + } + + // start a new worker, seems heavy enough and unrelated to active or pending syncs + return ts, true, nil +} + +// selects the next sync target after a worker sync has finished; returns true and a target +// TipSet if this chain should continue to sync because there is a heavier related tipset. +func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) { + // we pop the related bucket and if there is any related tipset, we work on the heaviest one next + // if we are not already working on a heavier tipset + related := sm.pend.PopRelated(done) + if related == nil { + return nil, false, nil + } + + heaviest := related.heaviestTipSet() + for _, ws := range sm.state { + if ws.ts.Height() > heaviest.Height() { + return nil, false, nil + } + } + + return heaviest, true, nil +} + +// sync buckets and related utilities type syncBucketSet struct { buckets []*syncTargetBucket } +type syncTargetBucket struct { + tips []*types.TipSet +} + func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { var stb syncTargetBucket for _, ts := range tipsets { @@ -250,10 +464,6 @@ func (sbs *syncBucketSet) Empty() bool { return len(sbs.buckets) == 0 } -type syncTargetBucket struct { - tips []*types.TipSet -} - func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { for _, t := range stb.tips { if ts.Equals(t) { @@ -296,196 +506,3 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { } return best } - -func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { - var buckets syncBucketSet - - var peerHeads []*types.TipSet - for _, ts := range sm.peerHeads { - peerHeads = append(peerHeads, ts) - } - sort.Slice(peerHeads, func(i, j int) bool { - return peerHeads[i].Height() < peerHeads[j].Height() - }) - - for _, ts := range peerHeads { - buckets.Insert(ts) - } - - if len(buckets.buckets) > 1 { - log.Warn("caution, multiple distinct chains seen during head selections") - // TODO: we *could* refuse to sync here without user intervention. - // For now, just select the best cluster - } - - return buckets.Heaviest(), nil -} - -func (sm *syncManager) syncScheduler() { - for { - select { - case ts, ok := <-sm.incomingTipSets: - if !ok { - log.Info("shutting down sync scheduler") - return - } - - sm.scheduleIncoming(ts) - case res := <-sm.syncResults: - sm.scheduleProcessResult(res) - case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): - sm.scheduleWorkSent() - case <-sm.stop: - log.Info("sync scheduler shutting down") - return - } - } -} - -func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { - log.Debug("scheduling incoming tipset sync: ", ts.Cids()) - if sm.getBootstrapState() == BSStateSelected { - sm.setBootstrapState(BSStateScheduled) - sm.syncTargets <- ts - return - } - - var relatedToActiveSync bool - for _, acts := range sm.activeSyncs { - if ts.Equals(acts) { - // ignore, we are already syncing it - return - } - - if ts.Parents() == acts.Key() { - // sync this next, after that sync process finishes - relatedToActiveSync = true - } - } - - if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) { - relatedToActiveSync = true - } - - // if this is related to an active sync process, immediately bucket it - // we don't want to start a parallel sync process that duplicates work - if relatedToActiveSync { - sm.activeSyncTips.Insert(ts) - return - } - - if sm.getBootstrapState() == BSStateScheduled { - sm.syncQueue.Insert(ts) - return - } - - if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { - sm.nextSyncTarget.add(ts) - } else { - sm.syncQueue.Insert(ts) - - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = sm.syncQueue.Pop() - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleProcessResult(res *syncResult) { - if res.success && sm.getBootstrapState() != BSStateComplete { - sm.setBootstrapState(BSStateComplete) - } - - delete(sm.activeSyncs, res.ts.Key()) - relbucket := sm.activeSyncTips.PopRelated(res.ts) - if relbucket != nil { - if res.success { - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbucket - sm.workerChan = sm.syncTargets - } else { - for _, t := range relbucket.tips { - sm.syncQueue.Insert(t) - } - } - return - } - // TODO: this is the case where we try to sync a chain, and - // fail, and we have more blocks on top of that chain that - // have come in since. The question is, should we try to - // sync these? or just drop them? - log.Error("failed to sync chain but have new unconnected blocks from chain") - } - - if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() { - next := sm.syncQueue.Pop() - if next != nil { - sm.nextSyncTarget = next - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleWorkSent() { - hts := sm.nextSyncTarget.heaviestTipSet() - sm.activeSyncs[hts.Key()] = hts - - if !sm.syncQueue.Empty() { - sm.nextSyncTarget = sm.syncQueue.Pop() - } else { - sm.nextSyncTarget = nil - sm.workerChan = nil - } -} - -func (sm *syncManager) syncWorker(id int) { - ss := sm.syncStates[id] - for { - select { - case ts, ok := <-sm.syncTargets: - if !ok { - log.Info("sync manager worker shutting down") - return - } - - ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) - err := sm.doSync(ctx, ts) - if err != nil { - log.Errorf("sync error: %+v", err) - } - - sm.syncResults <- &syncResult{ - ts: ts, - success: err == nil, - } - } - } -} - -func (sm *syncManager) syncedPeerCount() int { - var count int - for _, ts := range sm.peerHeads { - if ts.Height() > 0 { - count++ - } - } - return count -} - -func (sm *syncManager) getBootstrapState() int { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState -} - -func (sm *syncManager) setBootstrapState(v int) { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - sm.bootstrapState = v -} - -func (sm *syncManager) IsBootstrapped() bool { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState == BSStateComplete -} diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 709e03a41..74fb77dc3 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -28,7 +28,12 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, <-ch return nil }).(*syncManager) - sm.bspThresh = thresh + + oldBootstrapPeerThreshold := BootstrapPeerThreshold + BootstrapPeerThreshold = thresh + defer func() { + BootstrapPeerThreshold = oldBootstrapPeerThreshold + }() sm.Start() defer sm.Stop() @@ -112,8 +117,8 @@ func TestSyncManagerEdgeCase(t *testing.T) { waitUntilAllWorkersAreDone(stc) - if len(sm.activeSyncTips.buckets) != 0 { - t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) + if len(sm.state) != 0 { + t.Errorf("active syncs expected empty but got: %d", len(sm.state)) } }) }