package chain import ( "context" "os" "sort" "strings" "sync" "github.com/filecoin-project/lotus/chain/types" peer "github.com/libp2p/go-libp2p-core/peer" ) const BootstrapPeerThreshold = 2 var coalesceForksParents = false func init() { if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" { coalesceForksParents = true } } const ( BSStateInit = 0 BSStateSelected = 1 BSStateScheduled = 2 BSStateComplete = 3 ) type SyncFunc func(context.Context, *types.TipSet) error // SyncManager manages the chain synchronization process, both at bootstrap time // and during ongoing operation. // // It receives candidate chain heads in the form of tipsets from peers, // and schedules them onto sync workers, deduplicating processing for // already-active syncs. type SyncManager interface { // Start starts the SyncManager. Start() // Stop stops the SyncManager. Stop() // SetPeerHead informs the SyncManager that the supplied peer reported the // supplied tipset. SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) // State retrieves the state of the sync workers. State() []SyncerStateSnapshot } type syncManager struct { lk sync.Mutex peerHeads map[peer.ID]*types.TipSet bssLk sync.Mutex bootstrapState int bspThresh int incomingTipSets chan *types.TipSet syncTargets chan *types.TipSet syncResults chan *syncResult syncStates []*SyncerState // Normally this handler is set to `(*Syncer).Sync()`. doSync func(context.Context, *types.TipSet) error stop chan struct{} // Sync Scheduler fields activeSyncs map[types.TipSetKey]*types.TipSet syncQueue syncBucketSet activeSyncTips syncBucketSet nextSyncTarget *syncTargetBucket workerChan chan *types.TipSet } var _ SyncManager = (*syncManager)(nil) type syncResult struct { ts *types.TipSet success bool } const syncWorkerCount = 3 func NewSyncManager(sync SyncFunc) SyncManager { sm := &syncManager{ bspThresh: 1, peerHeads: make(map[peer.ID]*types.TipSet), syncTargets: make(chan *types.TipSet), syncResults: make(chan *syncResult), syncStates: make([]*SyncerState, syncWorkerCount), incomingTipSets: make(chan *types.TipSet), activeSyncs: make(map[types.TipSetKey]*types.TipSet), doSync: sync, stop: make(chan struct{}), } for i := range sm.syncStates { sm.syncStates[i] = new(SyncerState) } return sm } func (sm *syncManager) Start() { go sm.syncScheduler() for i := 0; i < syncWorkerCount; i++ { go sm.syncWorker(i) } } func (sm *syncManager) Stop() { close(sm.stop) } func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { sm.lk.Lock() defer sm.lk.Unlock() sm.peerHeads[p] = ts if sm.getBootstrapState() == BSStateInit { spc := sm.syncedPeerCount() if spc >= sm.bspThresh { // Its go time! target, err := sm.selectSyncTarget() if err != nil { log.Error("failed to select sync target: ", err) return } sm.setBootstrapState(BSStateSelected) sm.incomingTipSets <- target } log.Infof("sync bootstrap has %d peers", spc) return } sm.incomingTipSets <- ts } func (sm *syncManager) State() []SyncerStateSnapshot { ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates)) for _, s := range sm.syncStates { ret = append(ret, s.Snapshot()) } return ret } type syncBucketSet struct { buckets []*syncTargetBucket } func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { var stb syncTargetBucket for _, ts := range tipsets { stb.add(ts) } return &stb } func (sbs *syncBucketSet) String() string { var bStrings []string for _, b := range sbs.buckets { var tsStrings []string for _, t := range b.tips { tsStrings = append(tsStrings, t.String()) } bStrings = append(bStrings, "["+strings.Join(tsStrings, ",")+"]") } return "{" + strings.Join(bStrings, ";") + "}" } func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool { for _, b := range sbs.buckets { if b.sameChainAs(ts) { return true } } return false } func (sbs *syncBucketSet) Insert(ts *types.TipSet) { for _, b := range sbs.buckets { if b.sameChainAs(ts) { b.add(ts) return } } sbs.buckets = append(sbs.buckets, newSyncTargetBucket(ts)) } func (sbs *syncBucketSet) Pop() *syncTargetBucket { var bestBuck *syncTargetBucket var bestTs *types.TipSet for _, b := range sbs.buckets { hts := b.heaviestTipSet() if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) { bestBuck = b bestTs = hts } } sbs.removeBucket(bestBuck) return bestBuck } func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1) for _, b := range sbs.buckets { if b != toremove { nbuckets = append(nbuckets, b) } } sbs.buckets = nbuckets } func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { var bOut *syncTargetBucket for _, b := range sbs.buckets { if b.sameChainAs(ts) { sbs.removeBucket(b) if bOut == nil { bOut = &syncTargetBucket{} } bOut.tips = append(bOut.tips, b.tips...) } } return bOut } func (sbs *syncBucketSet) Heaviest() *types.TipSet { // TODO: should also consider factoring in number of peers represented by each bucket here var bestTs *types.TipSet for _, b := range sbs.buckets { bhts := b.heaviestTipSet() if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) { bestTs = bhts } } return bestTs } func (sbs *syncBucketSet) Empty() bool { return len(sbs.buckets) == 0 } type syncTargetBucket struct { tips []*types.TipSet } func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { for _, t := range stb.tips { if ts.Equals(t) { return true } if ts.Key() == t.Parents() { return true } if ts.Parents() == t.Key() { return true } if coalesceForksParents && ts.Parents() == t.Parents() { return true } } return false } func (stb *syncTargetBucket) add(ts *types.TipSet) { for _, t := range stb.tips { if t.Equals(ts) { return } } stb.tips = append(stb.tips, ts) } func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { if stb == nil { return nil } var best *types.TipSet for _, ts := range stb.tips { if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) { best = ts } } return best } func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { var buckets syncBucketSet var peerHeads []*types.TipSet for _, ts := range sm.peerHeads { peerHeads = append(peerHeads, ts) } sort.Slice(peerHeads, func(i, j int) bool { return peerHeads[i].Height() < peerHeads[j].Height() }) for _, ts := range peerHeads { buckets.Insert(ts) } if len(buckets.buckets) > 1 { log.Warn("caution, multiple distinct chains seen during head selections") // TODO: we *could* refuse to sync here without user intervention. // For now, just select the best cluster } return buckets.Heaviest(), nil } func (sm *syncManager) syncScheduler() { for { select { case ts, ok := <-sm.incomingTipSets: if !ok { log.Info("shutting down sync scheduler") return } sm.scheduleIncoming(ts) case res := <-sm.syncResults: sm.scheduleProcessResult(res) case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): sm.scheduleWorkSent() case <-sm.stop: log.Info("sync scheduler shutting down") return } } } func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { log.Debug("scheduling incoming tipset sync: ", ts.Cids()) if sm.getBootstrapState() == BSStateSelected { sm.setBootstrapState(BSStateScheduled) sm.syncTargets <- ts return } var relatedToActiveSync bool for _, acts := range sm.activeSyncs { if ts.Equals(acts) { // ignore, we are already syncing it return } if ts.Parents() == acts.Key() { // sync this next, after that sync process finishes relatedToActiveSync = true } } if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) { relatedToActiveSync = true } // if this is related to an active sync process, immediately bucket it // we don't want to start a parallel sync process that duplicates work if relatedToActiveSync { sm.activeSyncTips.Insert(ts) return } if sm.getBootstrapState() == BSStateScheduled { sm.syncQueue.Insert(ts) return } if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { sm.nextSyncTarget.add(ts) } else { sm.syncQueue.Insert(ts) if sm.nextSyncTarget == nil { sm.nextSyncTarget = sm.syncQueue.Pop() sm.workerChan = sm.syncTargets } } } func (sm *syncManager) scheduleProcessResult(res *syncResult) { if res.success && sm.getBootstrapState() != BSStateComplete { sm.setBootstrapState(BSStateComplete) } delete(sm.activeSyncs, res.ts.Key()) relbucket := sm.activeSyncTips.PopRelated(res.ts) if relbucket != nil { if res.success { if sm.nextSyncTarget == nil { sm.nextSyncTarget = relbucket sm.workerChan = sm.syncTargets } else { for _, t := range relbucket.tips { sm.syncQueue.Insert(t) } } return } // TODO: this is the case where we try to sync a chain, and // fail, and we have more blocks on top of that chain that // have come in since. The question is, should we try to // sync these? or just drop them? log.Error("failed to sync chain but have new unconnected blocks from chain") } if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() { next := sm.syncQueue.Pop() if next != nil { sm.nextSyncTarget = next sm.workerChan = sm.syncTargets } } } func (sm *syncManager) scheduleWorkSent() { hts := sm.nextSyncTarget.heaviestTipSet() sm.activeSyncs[hts.Key()] = hts if !sm.syncQueue.Empty() { sm.nextSyncTarget = sm.syncQueue.Pop() } else { sm.nextSyncTarget = nil sm.workerChan = nil } } func (sm *syncManager) syncWorker(id int) { ss := sm.syncStates[id] for { select { case ts, ok := <-sm.syncTargets: if !ok { log.Info("sync manager worker shutting down") return } ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) err := sm.doSync(ctx, ts) if err != nil { log.Errorf("sync error: %+v", err) } sm.syncResults <- &syncResult{ ts: ts, success: err == nil, } } } } func (sm *syncManager) syncedPeerCount() int { var count int for _, ts := range sm.peerHeads { if ts.Height() > 0 { count++ } } return count } func (sm *syncManager) getBootstrapState() int { sm.bssLk.Lock() defer sm.bssLk.Unlock() return sm.bootstrapState } func (sm *syncManager) setBootstrapState(v int) { sm.bssLk.Lock() defer sm.bssLk.Unlock() sm.bootstrapState = v } func (sm *syncManager) IsBootstrapped() bool { sm.bssLk.Lock() defer sm.bssLk.Unlock() return sm.bootstrapState == BSStateComplete }