diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 0fdcf32de..4226902c5 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -11,12 +11,21 @@ import ( const BootstrapPeerThreshold = 2 +const ( + BSStateInit = 0 + BSStateSelected = 1 + BSStateScheduled = 2 + BSStateComplete = 3 +) + type SyncFunc func(context.Context, *types.TipSet) error type SyncManager struct { - lk sync.Mutex - peerHeads map[peer.ID]*types.TipSet - bootstrapped bool + lk sync.Mutex + peerHeads map[peer.ID]*types.TipSet + + bssLk sync.Mutex + bootstrapState int bspThresh int @@ -75,7 +84,7 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip defer sm.lk.Unlock() sm.peerHeads[p] = ts - if !sm.bootstrapped { + if sm.getBootstrapState() == BSStateInit { spc := sm.syncedPeerCount() if spc >= sm.bspThresh { // Its go time! @@ -84,10 +93,9 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip log.Error("failed to select sync target: ", err) return } + sm.setBootstrapState(BSStateSelected) sm.incomingTipSets <- target - // TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes - sm.bootstrapped = true } log.Infof("sync bootstrap has %d peers", spc) return @@ -175,6 +183,10 @@ func (sbs *syncBucketSet) Heaviest() *types.TipSet { return bestTs } +func (sbs *syncBucketSet) Empty() bool { + return len(sbs.buckets) == 0 +} + type syncTargetBucket struct { tips []*types.TipSet count int @@ -271,6 +283,12 @@ func (sm *SyncManager) syncScheduler() { } func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { + if sm.getBootstrapState() == BSStateSelected { + sm.setBootstrapState(BSStateScheduled) + sm.syncTargets <- ts + return + } + var relatedToActiveSync bool for _, acts := range sm.activeSyncs { if ts.Equals(acts) { @@ -294,6 +312,11 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { return } + if sm.getBootstrapState() == BSStateScheduled { + sm.syncQueue.Insert(ts) + return + } + if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { sm.nextSyncTarget.add(ts) } else { @@ -307,6 +330,9 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { } func (sm *SyncManager) scheduleProcessResult(res *syncResult) { + if res.success && sm.getBootstrapState() != BSStateComplete { + sm.setBootstrapState(BSStateComplete) + } delete(sm.activeSyncs, res.ts.Key()) relbucket := sm.activeSyncTips.PopRelated(res.ts) if relbucket != nil { @@ -317,6 +343,7 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) { } else { sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) } + return } else { // TODO: this is the case where we try to sync a chain, and // fail, and we have more blocks on top of that chain that @@ -324,13 +351,21 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) { // sync these? or just drop them? } } + + if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() { + next := sm.syncQueue.Pop() + if next != nil { + sm.nextSyncTarget = next + sm.workerChan = sm.syncTargets + } + } } func (sm *SyncManager) scheduleWorkSent() { hts := sm.nextSyncTarget.heaviestTipSet() sm.activeSyncs[hts.Key()] = hts - if len(sm.syncQueue.buckets) > 0 { + if !sm.syncQueue.Empty() { sm.nextSyncTarget = sm.syncQueue.Pop() } else { sm.nextSyncTarget = nil @@ -373,8 +408,20 @@ func (sm *SyncManager) syncedPeerCount() int { return count } -func (sm *SyncManager) IsBootstrapped() bool { - sm.lk.Lock() - defer sm.lk.Unlock() - return sm.bootstrapped +func (sm *SyncManager) getBootstrapState() int { + sm.bssLk.Lock() + defer sm.bssLk.Unlock() + return sm.bootstrapState +} + +func (sm *SyncManager) setBootstrapState(v int) { + sm.bssLk.Lock() + defer sm.bssLk.Unlock() + sm.bootstrapState = v +} + +func (sm *SyncManager) IsBootstrapped() bool { + sm.bssLk.Lock() + defer sm.bssLk.Unlock() + return sm.bootstrapState == BSStateComplete }