From 9e4b3ae88af3a14986a2ea8ff70a234733fab354 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 12 Nov 2019 21:51:36 -0800 Subject: [PATCH 01/11] WIP: implement chain sync manager and concurrent sync --- chain/sync.go | 14 ++----- chain/sync_manager.go | 87 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 11 deletions(-) create mode 100644 chain/sync_manager.go diff --git a/chain/sync.go b/chain/sync.go index 43c1030ff..794d32e6a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -47,8 +47,6 @@ type Syncer struct { // The known Genesis tipset Genesis *types.TipSet - syncLock sync.Mutex - // TipSets known to be invalid bad *BadBlockCache @@ -57,12 +55,10 @@ type Syncer struct { self peer.ID + syncLock sync.Mutex syncState SyncerState - // peer heads - // Note: clear cache on disconnects - peerHeads map[peer.ID]*types.TipSet - peerHeadsLk sync.Mutex + syncmgr *SyncManager } func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { @@ -87,8 +83,6 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) }, nil } -const BootstrapPeerThreshold = 1 - // InformNewHead informs the syncer about a new potential tipset // This should be called when connecting to new peers, and additionally // when receiving new blocks from the network @@ -123,10 +117,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { return } - syncer.peerHeadsLk.Lock() - syncer.peerHeads[from] = fts.TipSet() - syncer.peerHeadsLk.Unlock() syncer.Bsync.AddPeer(from) + syncer.syncmgr.SetPeerHead(from, fts.TipSet()) bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight targetWeight := fts.TipSet().Blocks()[0].ParentWeight diff --git a/chain/sync_manager.go b/chain/sync_manager.go new file mode 100644 index 000000000..a1bb6f5cd --- /dev/null +++ b/chain/sync_manager.go @@ -0,0 +1,87 @@ +package chain + +import ( + "context" + "sync" + + "github.com/filecoin-project/lotus/chain/types" + peer "github.com/libp2p/go-libp2p-peer" +) + +const BootstrapPeerThreshold = 2 + +type SyncFunc func(context.Context, *types.TipSet) error + +type SyncManager struct { + lk sync.Mutex + peerHeads map[peer.ID]*types.TipSet + bootstrapped bool + + bspThresh int + + syncTargets chan *types.TipSet + + doSync func(context.Context, *types.TipSet) error +} + +func NewSyncManager(sync SyncFunc) *SyncManager { + return &SyncManager{ + peerHeads: make(map[peer.ID]*types.TipSet), + syncTargets: make(chan *types.TipSet), + doSync: sync, + } +} + +func (sm *SyncManager) Start() { + for i := 0; i < syncWorkerCount; i++ { + go sm.syncWorker(i) + } +} + +func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { + sm.lk.Lock() + defer sm.lk.Unlock() + sm.peerHeads[p] = ts + + if !sm.bootstrapped { + spc := sm.syncedPeerCount() + if spc >= sm.bspThresh { + // Its go time! + } + log.Infof("sync bootstrap has %d peers", spc) + return + } + +} + +func (sm *SyncManager) syncWorker(id int) { + for { + select { + case ts, ok := sm.syncTargets: + if !ok { + log.Info("sync manager worker shutting down") + return + } + + if err := sm.doSync(context.TODO(), ts); err != nil { + log.Errorf("sync error: %+v", err) + } + } + } +} + +func (sm *SyncManager) syncedPeerCount() int { + var count int + for _, ts := range sm.peerHeads { + if ts.Height() > 0 { + count++ + } + } + return count +} + +func (sm *SyncManager) IsBootstrapped() bool { + sm.lk.Lock() + defer sm.lk.Unlock() + return sm.bootstrapped +} From 4bc523e6b4ba8487ef229aea1e1892f07edc3b04 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 13 Nov 2019 09:03:56 -0800 Subject: [PATCH 02/11] more wip, squashme --- chain/blocksync/blocksync_client.go | 8 ++++++++ chain/sync_manager.go | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 52e17f7a3..819a24001 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -77,6 +77,14 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) var oerr error for _, p := range peers { + // TODO: doing this synchronously isnt great, but fetching in parallel + // may not be a good idea either. think about this more + select { + case <-ctx.Done(): + return nil, xerrors.Errorf("blocksync getblocks failed: %w", ctx.Err()) + default: + } + res, err := bs.sendRequestToPeer(ctx, p, req) if err != nil { oerr = err diff --git a/chain/sync_manager.go b/chain/sync_manager.go index a1bb6f5cd..c83a63bcc 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -54,6 +54,10 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { } +func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { + panic("NYI") +} + func (sm *SyncManager) syncWorker(id int) { for { select { From 1d81c53f8fa735e587e67ac1df73e847a0db8f4b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 14 Nov 2019 18:27:43 -0800 Subject: [PATCH 03/11] getting closer, squashme --- chain/sync.go | 16 ++-- chain/sync_manager.go | 198 +++++++++++++++++++++++++++++++++++++++++- chain/types/tipset.go | 4 + 3 files changed, 208 insertions(+), 10 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 794d32e6a..db3380c24 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -55,8 +55,7 @@ type Syncer struct { self peer.ID - syncLock sync.Mutex - syncState SyncerState + syncLock sync.Mutex syncmgr *SyncManager } @@ -73,13 +72,12 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) } return &Syncer{ - bad: NewBadBlockCache(), - Genesis: gent, - Bsync: bsync, - peerHeads: make(map[peer.ID]*types.TipSet), - store: sm.ChainStore(), - sm: sm, - self: self, + bad: NewBadBlockCache(), + Genesis: gent, + Bsync: bsync, + store: sm.ChainStore(), + sm: sm, + self: self, }, nil } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c83a63bcc..d2cb5ba9d 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -2,6 +2,7 @@ package chain import ( "context" + "sort" "sync" "github.com/filecoin-project/lotus/chain/types" @@ -21,14 +22,24 @@ type SyncManager struct { syncTargets chan *types.TipSet + asLk sync.Mutex + activeSyncs map[types.TipSetKey]*types.TipSet + queuedSyncs map[types.TipSetKey]*types.TipSet + + syncState SyncerState + doSync func(context.Context, *types.TipSet) error + + stop chan struct{} } func NewSyncManager(sync SyncFunc) *SyncManager { return &SyncManager{ peerHeads: make(map[peer.ID]*types.TipSet), syncTargets: make(chan *types.TipSet), + activeSyncs: make([]*types.TipSet, syncWorkerCount), doSync: sync, + stop: make(chan struct{}), } } @@ -47,6 +58,17 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { spc := sm.syncedPeerCount() if spc >= sm.bspThresh { // Its go time! + target, err := sm.selectSyncTarget() + if err != nil { + log.Error("failed to select sync target: ", err) + return + } + + sm.asLk.Lock() + sm.activeSyncs[target.Key()] = target + sm.asLk.Unlock() + sm.syncTargets <- target + sm.bootstrapped = true } log.Infof("sync bootstrap has %d peers", spc) return @@ -54,8 +76,178 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { } +type syncBucketSet struct { + buckets []*syncTargetBucket +} + +func (sbs *syncBucketSet) Insert(ts *types.TipSet) { + for _, b := range sbs.buckets { + if b.sameChainAs(ts) { + b.add(ts) + return + } + } + sbs.buckets = append(sbs.buckets, &syncTargetBucket{ + tips: []*types.TipSet{ts}, + count: 1, + }) +} + +func (sbs *syncBucketSet) Pop() *syncTargetBucket { + var bestBuck *syncTargetBucket + var bestTs *types.TipSet + for _, b := range sbs.buckets { + hts := b.heaviestTipSet() + if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) { + bestBuck = b + bestTs = hts + } + } + nbuckets := make([]*syncTargetBucket, len(sbs.buckets)-1) + return bestBuck +} + +func (sbs *syncBucketSet) Heaviest() *types.TipSet { + // TODO: should also consider factoring in number of peers represented by each bucket here + var bestTs *types.TipSet + for _, b := range buckets { + bhts := b.heaviestTipSet() + if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) { + bestTs = bhts + } + } + return bestTs +} + +type syncTargetBucket struct { + tips []*types.TipSet + count int +} + +func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { + var stb syncTargetBucket + for _, ts := range tipsets { + stb.add(ts) + } + return &stb +} + +func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { + for _, t := range stb.tips { + if ts.Equals(t) { + return true + } + if types.CidArrsEqual(ts.Cids(), t.Parents()) { + return true + } + if types.CidArrsEqual(ts.Parents(), t.Cids()) { + return true + } + } + return false +} + +func (stb *syncTargetBucket) add(ts *types.TipSet) { + stb.count++ + + for _, t := range stb.tips { + if t.Equals(ts) { + return + } + } + + stb.tips = append(stb.tips, ts) +} + +func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { + var best *types.TipSet + for _, ts := range stb.tips { + if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) { + best = ts + } + } + return best +} + func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { - panic("NYI") + var buckets syncBucketSet + + var peerHeads []*types.TipSet + for _, ts := range sm.peerHeads { + peerHeads = append(peerHeads, ts) + } + sort.Slice(peerHeads, func(i, j int) bool { + return peerHeads[i].Height() < peerHeads[j].Height() + }) + + for _, ts := range peerHeads { + buckets.Insert(ts) + } + + if len(buckets.buckets) > 1 { + log.Warning("caution, multiple distinct chains seen during head selections") + // TODO: we *could* refuse to sync here without user intervention. + // For now, just select the best cluster + } + + return buckets.Heaviest(), nil +} + +func (sm *SyncManager) syncScheduler() { + var syncQueue syncBucketSet + + var nextSyncTarget *syncTargetBucket + var workerChan chan *types.TipSet + + for { + select { + case ts, ok := <-sm.incomingTipSets: + if !ok { + log.Info("shutting down sync scheduler") + return + } + + var relatedToActiveSync bool + sm.asLk.Lock() + for _, acts := range sm.activeSyncs { + if ts.Equals(acts) { + break + } + + if types.CidArrsEqual(ts.Parents(), acts.Cids()) { + // sync this next, after that sync process finishes + relatedToActiveSync = true + } + } + sm.asLk.Unlock() + + // if this is related to an active sync process, immediately bucket it + // we don't want to start a parallel sync process that duplicates work + if relatedToActiveSync { + syncQueue.Insert(ts) + } + + if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) { + nextSyncTarget.add(ts) + } else { + syncQueue.Insert(ts) + + if nextSyncTarget == nil { + nextSyncTarget = syncQueue.Pop() + workerChan = workerChanVal + } + } + case workerChan <- nextSyncTarget.heaviestTipSet(): + if len(syncQueue.buckets) > 0 { + nextSyncTarget = syncQueue.Pop() + } else { + workerChan = nil + } + case <-sm.stop: + log.Info("sync scheduler shutting down") + return + } + } } func (sm *SyncManager) syncWorker(id int) { @@ -70,6 +262,10 @@ func (sm *SyncManager) syncWorker(id int) { if err := sm.doSync(context.TODO(), ts); err != nil { log.Errorf("sync error: %+v", err) } + + sm.asLk.Lock() + delete(sm.activeSyncs, ts.Key()) + sm.asLk.Unlock() } } } diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 13fb7e582..ca6941e5c 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -197,6 +197,10 @@ func (ts *TipSet) ParentState() cid.Cid { return ts.blks[0].ParentStateRoot } +func (ts *TipSet) ParentWeight() BigInt { + return ts.blks[0].ParentWeight +} + func (ts *TipSet) Contains(oc cid.Cid) bool { for _, c := range ts.cids { if c == oc { From 7aa76d21d14229eec2557841583e6372d290951e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 15 Nov 2019 13:35:29 -0800 Subject: [PATCH 04/11] It works! --- chain/sync.go | 71 +++++++++++++++++-------- chain/sync_manager.go | 118 ++++++++++++++++++++++++++++++++++-------- chain/syncstate.go | 12 +++++ node/builder.go | 2 +- node/modules/chain.go | 21 ++++++++ 5 files changed, 178 insertions(+), 46 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index db3380c24..005ca4394 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -71,14 +71,25 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) return nil, err } - return &Syncer{ + s := &Syncer{ bad: NewBadBlockCache(), Genesis: gent, Bsync: bsync, store: sm.ChainStore(), sm: sm, self: self, - }, nil + } + + s.syncmgr = NewSyncManager(s.Sync) + return s, nil +} + +func (syncer *Syncer) Start() { + syncer.syncmgr.Start() +} + +func (syncer *Syncer) Stop() { + syncer.syncmgr.Stop() } // InformNewHead informs the syncer about a new potential tipset @@ -118,18 +129,20 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) syncer.syncmgr.SetPeerHead(from, fts.TipSet()) - bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight - targetWeight := fts.TipSet().Blocks()[0].ParentWeight - if targetWeight.LessThan(bestPweight) { - log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") - return - } - - go func() { - if err := syncer.Sync(ctx, fts.TipSet()); err != nil { - log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) + /* + bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight + targetWeight := fts.TipSet().Blocks()[0].ParentWeight + if targetWeight.LessThan(bestPweight) { + log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") + return } - }() + + go func() { + if err := syncer.Sync(ctx, fts.TipSet()); err != nil { + log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) + } + }() + */ } func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { @@ -748,9 +761,20 @@ func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig types.Signatur return nil } +const syncStateKey = "syncStateKey" + +func extractSyncState(ctx context.Context) *SyncerState { + v := ctx.Value(syncStateKey) + if v != nil { + return v.(*SyncerState) + } + return nil +} + func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() + ss := extractSyncState(ctx) span.AddAttributes( trace.Int64Attribute("fromHeight", int64(from.Height())), @@ -773,7 +797,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to // we want to sync all the blocks until the height above the block we have untilHeight := to.Height() + 1 - syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height()) + ss.SetHeight(blockSet[len(blockSet)-1].Height()) var acceptedBlocks []cid.Cid @@ -841,7 +865,7 @@ loop: acceptedBlocks = append(acceptedBlocks, at...) - syncer.syncState.SetHeight(blks[len(blks)-1].Height()) + ss.SetHeight(blks[len(blks)-1].Height()) at = blks[len(blks)-1].Parents() } @@ -904,7 +928,8 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type } func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { - syncer.syncState.SetHeight(0) + ss := extractSyncState(ctx) + ss.SetHeight(0) return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error { log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids())) @@ -913,7 +938,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* return xerrors.Errorf("message processing failed: %w", err) } - syncer.syncState.SetHeight(fts.TipSet().Height()) + ss.SetHeight(fts.TipSet().Height()) return nil }) @@ -1008,8 +1033,9 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "collectChain") defer span.End() + ss := extractSyncState(ctx) - syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) + ss.Init(syncer.store.GetHeaviestTipSet(), ts) headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet()) if err != nil { @@ -1022,7 +1048,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids()) } - syncer.syncState.SetStage(api.StagePersistHeaders) + ss.SetStage(api.StagePersistHeaders) toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch) for _, ts := range headers { @@ -1033,13 +1059,13 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error } toPersist = nil - syncer.syncState.SetStage(api.StageMessages) + ss.SetStage(api.StageMessages) if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil { return xerrors.Errorf("collectChain syncMessages: %w", err) } - syncer.syncState.SetStage(api.StageSyncComplete) + ss.SetStage(api.StageSyncComplete) log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids())) return nil @@ -1059,5 +1085,6 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker } func (syncer *Syncer) State() SyncerState { - return syncer.syncState.Snapshot() + panic("NYI") + //return syncer.syncState.Snapshot() } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index d2cb5ba9d..64e0c82ca 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -20,11 +20,11 @@ type SyncManager struct { bspThresh int - syncTargets chan *types.TipSet + incomingTipSets chan *types.TipSet + syncTargets chan *types.TipSet + syncResults chan *syncResult - asLk sync.Mutex activeSyncs map[types.TipSetKey]*types.TipSet - queuedSyncs map[types.TipSetKey]*types.TipSet syncState SyncerState @@ -33,30 +33,48 @@ type SyncManager struct { stop chan struct{} } +type syncResult struct { + ts *types.TipSet + success bool +} + +const syncWorkerCount = 3 + func NewSyncManager(sync SyncFunc) *SyncManager { return &SyncManager{ - peerHeads: make(map[peer.ID]*types.TipSet), - syncTargets: make(chan *types.TipSet), - activeSyncs: make([]*types.TipSet, syncWorkerCount), - doSync: sync, - stop: make(chan struct{}), + bspThresh: 1, + peerHeads: make(map[peer.ID]*types.TipSet), + syncTargets: make(chan *types.TipSet), + syncResults: make(chan *syncResult), + incomingTipSets: make(chan *types.TipSet), + activeSyncs: make(map[types.TipSetKey]*types.TipSet), + doSync: sync, + stop: make(chan struct{}), } } func (sm *SyncManager) Start() { + go sm.syncScheduler() for i := 0; i < syncWorkerCount; i++ { go sm.syncWorker(i) } } +func (sm *SyncManager) Stop() { + close(sm.stop) +} + func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { + log.Info("set peer head!") sm.lk.Lock() defer sm.lk.Unlock() sm.peerHeads[p] = ts if !sm.bootstrapped { + log.Info("not bootstrapped") spc := sm.syncedPeerCount() if spc >= sm.bspThresh { + log.Info("go time!") // Its go time! target, err := sm.selectSyncTarget() if err != nil { @@ -64,16 +82,15 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { return } - sm.asLk.Lock() - sm.activeSyncs[target.Key()] = target - sm.asLk.Unlock() - sm.syncTargets <- target + sm.incomingTipSets <- target + // TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes sm.bootstrapped = true } log.Infof("sync bootstrap has %d peers", spc) return } + sm.incomingTipSets <- ts } type syncBucketSet struct { @@ -103,14 +120,36 @@ func (sbs *syncBucketSet) Pop() *syncTargetBucket { bestTs = hts } } - nbuckets := make([]*syncTargetBucket, len(sbs.buckets)-1) + + sbs.removeBucket(bestBuck) + return bestBuck } +func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { + nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1) + for _, b := range sbs.buckets { + if b != toremove { + nbuckets = append(nbuckets, b) + } + } + sbs.buckets = nbuckets +} + +func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { + for _, b := range sbs.buckets { + if b.sameChainAs(ts) { + sbs.removeBucket(b) + return b + } + } + return nil +} + func (sbs *syncBucketSet) Heaviest() *types.TipSet { // TODO: should also consider factoring in number of peers represented by each bucket here var bestTs *types.TipSet - for _, b := range buckets { + for _, b := range sbs.buckets { bhts := b.heaviestTipSet() if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) { bestTs = bhts @@ -160,6 +199,10 @@ func (stb *syncTargetBucket) add(ts *types.TipSet) { } func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { + if stb == nil { + return nil + } + var best *types.TipSet for _, ts := range stb.tips { if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) { @@ -195,6 +238,7 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { func (sm *SyncManager) syncScheduler() { var syncQueue syncBucketSet + var activeSyncTips syncBucketSet var nextSyncTarget *syncTargetBucket var workerChan chan *types.TipSet @@ -208,7 +252,6 @@ func (sm *SyncManager) syncScheduler() { } var relatedToActiveSync bool - sm.asLk.Lock() for _, acts := range sm.activeSyncs { if ts.Equals(acts) { break @@ -219,28 +262,54 @@ func (sm *SyncManager) syncScheduler() { relatedToActiveSync = true } } - sm.asLk.Unlock() // if this is related to an active sync process, immediately bucket it // we don't want to start a parallel sync process that duplicates work if relatedToActiveSync { - syncQueue.Insert(ts) + log.Info("related to active sync") + activeSyncTips.Insert(ts) + continue } if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) { + log.Info("new tipset is part of our next sync target") nextSyncTarget.add(ts) } else { + log.Info("insert into that queue!") syncQueue.Insert(ts) if nextSyncTarget == nil { nextSyncTarget = syncQueue.Pop() - workerChan = workerChanVal + workerChan = sm.syncTargets + log.Info("setting next sync target") + } + } + case res := <-sm.syncResults: + delete(sm.activeSyncs, res.ts.Key()) + relbucket := activeSyncTips.PopRelated(res.ts) + if relbucket != nil { + if res.success { + if nextSyncTarget == nil { + nextSyncTarget = relbucket + workerChan = sm.syncTargets + } else { + syncQueue.buckets = append(syncQueue.buckets, relbucket) + } + } else { + // TODO: this is the case where we try to sync a chain, and + // fail, and we have more blocks on top of that chain that + // have come in since. The question is, should we try to + // sync these? or just drop them? } } case workerChan <- nextSyncTarget.heaviestTipSet(): + hts := nextSyncTarget.heaviestTipSet() + sm.activeSyncs[hts.Key()] = hts + if len(syncQueue.buckets) > 0 { nextSyncTarget = syncQueue.Pop() } else { + nextSyncTarget = nil workerChan = nil } case <-sm.stop: @@ -253,19 +322,22 @@ func (sm *SyncManager) syncScheduler() { func (sm *SyncManager) syncWorker(id int) { for { select { - case ts, ok := sm.syncTargets: + case ts, ok := <-sm.syncTargets: if !ok { log.Info("sync manager worker shutting down") return } + log.Info("sync worker go time!", ts.Cids()) - if err := sm.doSync(context.TODO(), ts); err != nil { + err := sm.doSync(context.TODO(), ts) + if err != nil { log.Errorf("sync error: %+v", err) } - sm.asLk.Lock() - delete(sm.activeSyncs, ts.Key()) - sm.asLk.Unlock() + sm.syncResults <- &syncResult{ + ts: ts, + success: err == nil, + } } } } diff --git a/chain/syncstate.go b/chain/syncstate.go index 2a5ea6899..45aeba90c 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -32,12 +32,20 @@ type SyncerState struct { } func (ss *SyncerState) SetStage(v api.SyncStateStage) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Stage = v } func (ss *SyncerState) Init(base, target *types.TipSet) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Target = target @@ -47,6 +55,10 @@ func (ss *SyncerState) Init(base, target *types.TipSet) { } func (ss *SyncerState) SetHeight(h uint64) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Height = h diff --git a/node/builder.go b/node/builder.go index ce647e822..02082a3e5 100644 --- a/node/builder.go +++ b/node/builder.go @@ -200,7 +200,7 @@ func Online() Option { Override(new(dtypes.ClientDAG), testing.MemoryClientDag), // Filecoin services - Override(new(*chain.Syncer), chain.NewSyncer), + Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), Override(new(*chain.MessagePool), modules.MessagePool), diff --git a/node/modules/chain.go b/node/modules/chain.go index fa77a40b8..25f71c84a 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -12,11 +12,13 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" + peer "github.com/libp2p/go-libp2p-peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -119,3 +121,22 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error { return cs.SetGenesis(genesis) } + +func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) { + syncer, err := chain.NewSyncer(sm, bsync, self) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + syncer.Start() + return nil + }, + OnStop: func(_ context.Context) error { + syncer.Stop() + return nil + }, + }) + return syncer, nil +} From 251ff41134b9654b8fda0dc4b42ec6530521c2d9 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 15 Nov 2019 17:05:16 -0800 Subject: [PATCH 05/11] plumb through new sync status logic --- api/api_full.go | 6 ++++- chain/sync.go | 15 +++++------ chain/sync_manager.go | 18 +++++++------ cli/sync.go | 57 +++++++++++++++++++++++++++++++----------- node/impl/full/sync.go | 20 +++++++++------ 5 files changed, 78 insertions(+), 38 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index f1dc51efc..dba59ad31 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -253,7 +253,7 @@ type ReplayResults struct { Error string } -type SyncState struct { +type ActiveSync struct { Base *types.TipSet Target *types.TipSet @@ -261,6 +261,10 @@ type SyncState struct { Height uint64 } +type SyncState struct { + ActiveSyncs []ActiveSync +} + type SyncStateStage int const ( diff --git a/chain/sync.go b/chain/sync.go index 005ca4394..a01c2be9e 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -127,7 +127,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } syncer.Bsync.AddPeer(from) - syncer.syncmgr.SetPeerHead(from, fts.TipSet()) + syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet()) /* bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight @@ -391,6 +391,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() + if span.IsRecordingEvents() { span.AddAttributes( trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())), @@ -398,9 +399,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ) } - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) { return nil } @@ -1084,7 +1082,10 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker return nil } -func (syncer *Syncer) State() SyncerState { - panic("NYI") - //return syncer.syncState.Snapshot() +func (syncer *Syncer) State() []SyncerState { + var out []SyncerState + for _, ss := range syncer.syncmgr.syncStates { + out = append(out, ss.Snapshot()) + } + return out } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 64e0c82ca..4f044700e 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -24,9 +24,9 @@ type SyncManager struct { syncTargets chan *types.TipSet syncResults chan *syncResult - activeSyncs map[types.TipSetKey]*types.TipSet + syncStates []*SyncerState - syncState SyncerState + activeSyncs map[types.TipSetKey]*types.TipSet doSync func(context.Context, *types.TipSet) error @@ -46,6 +46,7 @@ func NewSyncManager(sync SyncFunc) *SyncManager { peerHeads: make(map[peer.ID]*types.TipSet), syncTargets: make(chan *types.TipSet), syncResults: make(chan *syncResult), + syncStates: make([]*SyncerState, syncWorkerCount), incomingTipSets: make(chan *types.TipSet), activeSyncs: make(map[types.TipSetKey]*types.TipSet), doSync: sync, @@ -64,17 +65,15 @@ func (sm *SyncManager) Stop() { close(sm.stop) } -func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { - log.Info("set peer head!") +func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { + log.Info("set peer head!", ts.Height(), ts.Cids()) sm.lk.Lock() defer sm.lk.Unlock() sm.peerHeads[p] = ts if !sm.bootstrapped { - log.Info("not bootstrapped") spc := sm.syncedPeerCount() if spc >= sm.bspThresh { - log.Info("go time!") // Its go time! target, err := sm.selectSyncTarget() if err != nil { @@ -320,6 +319,8 @@ func (sm *SyncManager) syncScheduler() { } func (sm *SyncManager) syncWorker(id int) { + ss := &SyncerState{} + sm.syncStates[id] = ss for { select { case ts, ok := <-sm.syncTargets: @@ -327,9 +328,10 @@ func (sm *SyncManager) syncWorker(id int) { log.Info("sync manager worker shutting down") return } - log.Info("sync worker go time!", ts.Cids()) + log.Info("sync worker go time!", ts.Height(), ts.Cids()) - err := sm.doSync(context.TODO(), ts) + ctx := context.WithValue(context.TODO(), syncStateKey, ss) + err := sm.doSync(ctx, ts) if err != nil { log.Errorf("sync error: %+v", err) } diff --git a/cli/sync.go b/cli/sync.go index ab50889c1..7c5bccb30 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -31,24 +31,26 @@ var syncStatusCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - ss, err := api.SyncState(ctx) + state, err := api.SyncState(ctx) if err != nil { return err } - var base, target []cid.Cid - if ss.Base != nil { - base = ss.Base.Cids() - } - if ss.Target != nil { - target = ss.Target.Cids() - } - fmt.Println("sync status:") - fmt.Printf("Base:\t%s\n", base) - fmt.Printf("Target:\t%s\n", target) - fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage)) - fmt.Printf("Height: %d\n", ss.Height) + for i, ss := range state.ActiveSyncs { + fmt.Printf("worker %d:\n", i) + var base, target []cid.Cid + if ss.Base != nil { + base = ss.Base.Cids() + } + if ss.Target != nil { + target = ss.Target.Cids() + } + fmt.Printf("\tBase:\t%s\n", base) + fmt.Printf("\tTarget:\t%s\n", target) + fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage)) + fmt.Printf("\tHeight: %d\n", ss.Height) + } return nil }, } @@ -65,17 +67,42 @@ var syncWaitCmd = &cli.Command{ ctx := ReqContext(cctx) for { - ss, err := napi.SyncState(ctx) + state, err := napi.SyncState(ctx) if err != nil { return err } + var complete bool + working := -1 + for i, ss := range state.ActiveSyncs { + switch ss.Stage { + case api.StageSyncComplete: + complete = true + default: + working = i + case api.StageIdle: + // not complete, not actively working + } + } + + if complete && working != -1 { + fmt.Println("\nDone") + return nil + } + + if working == -1 { + fmt.Println("Idle...") + continue + } + + ss := state.ActiveSyncs[working] + var target []cid.Cid if ss.Target != nil { target = ss.Target.Cids() } - fmt.Printf("\r\x1b[2KTarget: %s\tState: %s\tHeight: %d", target, chain.SyncStageString(ss.Stage), ss.Height) + fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) if ss.Stage == api.StageSyncComplete { fmt.Println("\nDone") return nil diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index a9d2d3aa5..b06d51da4 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -20,13 +20,19 @@ type SyncAPI struct { } func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { - ss := a.Syncer.State() - return &api.SyncState{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, - }, nil + states := a.Syncer.State() + + out := &api.SyncState{} + + for _, ss := range states { + out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{ + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + }) + } + return out, nil } func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error { From 89fd13de9db3fc28a5825697983ba7ee3927f786 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 15 Nov 2019 22:48:42 -0800 Subject: [PATCH 06/11] clean up chain sync a little --- chain/sync.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index a01c2be9e..67a0bdb5d 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -127,22 +127,15 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } syncer.Bsync.AddPeer(from) + + bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight + targetWeight := fts.TipSet().Blocks()[0].ParentWeight + if targetWeight.LessThan(bestPweight) { + log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") + return + } + syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet()) - - /* - bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight - targetWeight := fts.TipSet().Blocks()[0].ParentWeight - if targetWeight.LessThan(bestPweight) { - log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") - return - } - - go func() { - if err := syncer.Sync(ctx, fts.TipSet()); err != nil { - log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) - } - }() - */ } func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { From 43447ca1bf53961c4f8b43bdb04b06f205e7ac24 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 16 Nov 2019 13:36:21 -0800 Subject: [PATCH 07/11] cleanup and add some tests --- chain/sync_manager.go | 142 ++++++++++++++++++++----------------- chain/sync_manager_test.go | 123 ++++++++++++++++++++++++++++++++ chain/types/mock/chain.go | 61 ++++++++++++++++ 3 files changed, 259 insertions(+), 67 deletions(-) create mode 100644 chain/sync_manager_test.go create mode 100644 chain/types/mock/chain.go diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 4f044700e..ea707ccd1 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -26,11 +26,16 @@ type SyncManager struct { syncStates []*SyncerState - activeSyncs map[types.TipSetKey]*types.TipSet - doSync func(context.Context, *types.TipSet) error stop chan struct{} + + // Sync Scheduler fields + activeSyncs map[types.TipSetKey]*types.TipSet + syncQueue syncBucketSet + activeSyncTips syncBucketSet + nextSyncTarget *syncTargetBucket + workerChan chan *types.TipSet } type syncResult struct { @@ -236,11 +241,6 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { } func (sm *SyncManager) syncScheduler() { - var syncQueue syncBucketSet - var activeSyncTips syncBucketSet - - var nextSyncTarget *syncTargetBucket - var workerChan chan *types.TipSet for { select { @@ -250,67 +250,11 @@ func (sm *SyncManager) syncScheduler() { return } - var relatedToActiveSync bool - for _, acts := range sm.activeSyncs { - if ts.Equals(acts) { - break - } - - if types.CidArrsEqual(ts.Parents(), acts.Cids()) { - // sync this next, after that sync process finishes - relatedToActiveSync = true - } - } - - // if this is related to an active sync process, immediately bucket it - // we don't want to start a parallel sync process that duplicates work - if relatedToActiveSync { - log.Info("related to active sync") - activeSyncTips.Insert(ts) - continue - } - - if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) { - log.Info("new tipset is part of our next sync target") - nextSyncTarget.add(ts) - } else { - log.Info("insert into that queue!") - syncQueue.Insert(ts) - - if nextSyncTarget == nil { - nextSyncTarget = syncQueue.Pop() - workerChan = sm.syncTargets - log.Info("setting next sync target") - } - } + sm.scheduleIncoming(ts) case res := <-sm.syncResults: - delete(sm.activeSyncs, res.ts.Key()) - relbucket := activeSyncTips.PopRelated(res.ts) - if relbucket != nil { - if res.success { - if nextSyncTarget == nil { - nextSyncTarget = relbucket - workerChan = sm.syncTargets - } else { - syncQueue.buckets = append(syncQueue.buckets, relbucket) - } - } else { - // TODO: this is the case where we try to sync a chain, and - // fail, and we have more blocks on top of that chain that - // have come in since. The question is, should we try to - // sync these? or just drop them? - } - } - case workerChan <- nextSyncTarget.heaviestTipSet(): - hts := nextSyncTarget.heaviestTipSet() - sm.activeSyncs[hts.Key()] = hts - - if len(syncQueue.buckets) > 0 { - nextSyncTarget = syncQueue.Pop() - } else { - nextSyncTarget = nil - workerChan = nil - } + sm.scheduleProcessResult(res) + case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): + sm.scheduleWorkSent() case <-sm.stop: log.Info("sync scheduler shutting down") return @@ -318,6 +262,70 @@ func (sm *SyncManager) syncScheduler() { } } +func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { + var relatedToActiveSync bool + for _, acts := range sm.activeSyncs { + if ts.Equals(acts) { + break + } + + if types.CidArrsEqual(ts.Parents(), acts.Cids()) { + // sync this next, after that sync process finishes + relatedToActiveSync = true + } + } + + // if this is related to an active sync process, immediately bucket it + // we don't want to start a parallel sync process that duplicates work + if relatedToActiveSync { + sm.activeSyncTips.Insert(ts) + return + } + + if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { + sm.nextSyncTarget.add(ts) + } else { + sm.syncQueue.Insert(ts) + + if sm.nextSyncTarget == nil { + sm.nextSyncTarget = sm.syncQueue.Pop() + sm.workerChan = sm.syncTargets + } + } +} + +func (sm *SyncManager) scheduleProcessResult(res *syncResult) { + delete(sm.activeSyncs, res.ts.Key()) + relbucket := sm.activeSyncTips.PopRelated(res.ts) + if relbucket != nil { + if res.success { + if sm.nextSyncTarget == nil { + sm.nextSyncTarget = relbucket + sm.workerChan = sm.syncTargets + } else { + sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) + } + } else { + // TODO: this is the case where we try to sync a chain, and + // fail, and we have more blocks on top of that chain that + // have come in since. The question is, should we try to + // sync these? or just drop them? + } + } +} + +func (sm *SyncManager) scheduleWorkSent() { + hts := sm.nextSyncTarget.heaviestTipSet() + sm.activeSyncs[hts.Key()] = hts + + if len(sm.syncQueue.buckets) > 0 { + sm.nextSyncTarget = sm.syncQueue.Pop() + } else { + sm.nextSyncTarget = nil + sm.workerChan = nil + } +} + func (sm *SyncManager) syncWorker(id int) { ss := &SyncerState{} sm.syncStates[id] = ss diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go new file mode 100644 index 000000000..1eb85ec2c --- /dev/null +++ b/chain/sync_manager_test.go @@ -0,0 +1,123 @@ +package chain + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" +) + +var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0)) + +type syncOp struct { + ts *types.TipSet + done func() +} + +func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) { + syncTargets := make(chan *syncOp) + sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error { + ch := make(chan struct{}) + syncTargets <- &syncOp{ + ts: ts, + done: func() { close(ch) }, + } + <-ch + return nil + }) + sm.bspThresh = thresh + + sm.Start() + defer sm.Stop() + t.Run(tname+fmt.Sprintf("-%d", thresh), func(t *testing.T) { + tf(t, sm, syncTargets) + }) +} + +func assertTsEqual(t *testing.T, actual, expected *types.TipSet) { + t.Helper() + if !actual.Equals(expected) { + t.Fatalf("got unexpected tipset %s (expected: %s)", actual.Cids(), expected.Cids()) + } +} + +func assertNoOp(t *testing.T, c chan *syncOp) { + t.Helper() + select { + case <-time.After(time.Millisecond * 20): + case <-c: + t.Fatal("shouldnt have gotten any sync operations yet") + } +} + +func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) { + t.Helper() + + select { + case <-time.After(time.Millisecond * 100): + t.Fatal("expected sync manager to try and sync to our target") + case op := <-c: + op.done() + if !op.ts.Equals(ts) { + t.Fatalf("somehow got wrong tipset from syncer (got %s, expected %s)", op.ts.Cids(), ts.Cids()) + } + } +} + +func TestSyncManager(t *testing.T) { + ctx := context.Background() + + a := mock.TipSet(mock.MkBlock(genTs, 1, 1)) + b := mock.TipSet(mock.MkBlock(a, 1, 2)) + c1 := mock.TipSet(mock.MkBlock(b, 1, 3)) + c2 := mock.TipSet(mock.MkBlock(b, 2, 4)) + d := mock.TipSet(mock.MkBlock(c1, 4, 5)) + + runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", c1) + assertGetSyncOp(t, stc, c1) + }) + + runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", c1) + assertNoOp(t, stc) + + sm.SetPeerHead(ctx, "peer2", c1) + assertGetSyncOp(t, stc, c1) + }) + + runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", b) + assertGetSyncOp(t, stc, b) + + sm.SetPeerHead(ctx, "peer2", c1) + assertGetSyncOp(t, stc, c1) + + sm.SetPeerHead(ctx, "peer2", c2) + assertGetSyncOp(t, stc, c2) + }) + + runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", a) + assertGetSyncOp(t, stc, a) + + sm.SetPeerHead(ctx, "peer2", b) + op := <-stc + + sm.SetPeerHead(ctx, "peer2", c1) + sm.SetPeerHead(ctx, "peer2", c2) + sm.SetPeerHead(ctx, "peer2", d) + + assertTsEqual(t, op.ts, b) + + // need a better way to 'wait until syncmgr is idle' + time.Sleep(time.Millisecond * 20) + + op.done() + + assertGetSyncOp(t, stc, d) + }) +} diff --git a/chain/types/mock/chain.go b/chain/types/mock/chain.go new file mode 100644 index 000000000..09b37370a --- /dev/null +++ b/chain/types/mock/chain.go @@ -0,0 +1,61 @@ +package mock + +import ( + "fmt" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +func Address(i uint64) address.Address { + a, err := address.NewIDAddress(i) + if err != nil { + panic(err) + } + return a +} + +func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader { + addr := Address(123561) + + c, err := cid.Decode("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i") + if err != nil { + panic(err) + } + + var pcids []cid.Cid + var height uint64 + weight := types.NewInt(weightInc) + if parents != nil { + pcids = parents.Cids() + height = parents.Height() + 1 + weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight) + } + + return &types.BlockHeader{ + Miner: addr, + ElectionProof: []byte("cats won the election"), + Tickets: []*types.Ticket{ + { + VRFProof: []byte(fmt.Sprintf("====%d=====", ticketNonce)), + }, + }, + Parents: pcids, + ParentMessageReceipts: c, + BLSAggregate: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")}, + ParentWeight: weight, + Messages: c, + Height: height, + ParentStateRoot: c, + BlockSig: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")}, + } +} + +func TipSet(blks ...*types.BlockHeader) *types.TipSet { + ts, err := types.NewTipSet(blks) + if err != nil { + panic(err) + } + return ts +} From 86221f3569613d7a0b9a5b67caa93dd85b241259 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 16 Nov 2019 14:34:05 -0800 Subject: [PATCH 08/11] sync wait waits until head is recent enough --- cli/sync.go | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/cli/sync.go b/cli/sync.go index 7c5bccb30..38cb9df8f 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -8,6 +8,7 @@ import ( "gopkg.in/urfave/cli.v2" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" ) @@ -72,12 +73,15 @@ var syncWaitCmd = &cli.Command{ return err } - var complete bool - working := -1 + head, err := napi.ChainHead(ctx) + if err != nil { + return err + } + + working := 0 for i, ss := range state.ActiveSyncs { switch ss.Stage { case api.StageSyncComplete: - complete = true default: working = i case api.StageIdle: @@ -85,16 +89,6 @@ var syncWaitCmd = &cli.Command{ } } - if complete && working != -1 { - fmt.Println("\nDone") - return nil - } - - if working == -1 { - fmt.Println("Idle...") - continue - } - ss := state.ActiveSyncs[working] var target []cid.Cid @@ -103,8 +97,9 @@ var syncWaitCmd = &cli.Command{ } fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) - if ss.Stage == api.StageSyncComplete { - fmt.Println("\nDone") + + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { + fmt.Println("Done!") return nil } From 0ccaa7154599049a30eb68e809a261b4aa455823 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 17 Nov 2019 15:29:56 -0600 Subject: [PATCH 09/11] fix stats tool --- tools/stats/rpc.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index 32943c16f..c2131b480 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -6,13 +6,13 @@ import ( "net/http" "time" - "github.com/multiformats/go-multiaddr-net" + manet "github.com/multiformats/go-multiaddr-net" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/jsonrpc" @@ -50,15 +50,15 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(30 * time.Second): - state, err := napi.SyncState(ctx) + case <-time.After(3 * time.Second): + head, err := napi.ChainHead(ctx) if err != nil { return err } - log.Printf("Stage %s, Height %d", chain.SyncStageString(state.Stage), state.Height) + log.Printf("Height %d", head.Height()) - if state.Stage == api.StageSyncComplete { + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { return nil } } From 1cd584d81b63e0cf62b010cca506b590f73b6932 Mon Sep 17 00:00:00 2001 From: Whyrusleeping Date: Tue, 19 Nov 2019 09:24:17 -0600 Subject: [PATCH 10/11] Update cli/sync.go --- cli/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/sync.go b/cli/sync.go index 38cb9df8f..e3e4dde43 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -99,7 +99,7 @@ var syncWaitCmd = &cli.Command{ fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { - fmt.Println("Done!") + fmt.Println("\nDone!") return nil } From 6dafca7aac1206c990519e58bc5b44a7796141da Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 19 Nov 2019 14:36:27 -0600 Subject: [PATCH 11/11] Use local type as ctx key License: MIT Signed-off-by: Jakub Sztandera --- chain/sync.go | 4 ++-- chain/sync_manager.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 67a0bdb5d..18627e715 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -752,10 +752,10 @@ func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig types.Signatur return nil } -const syncStateKey = "syncStateKey" +type syncStateKey struct{} func extractSyncState(ctx context.Context) *SyncerState { - v := ctx.Value(syncStateKey) + v := ctx.Value(syncStateKey{}) if v != nil { return v.(*SyncerState) } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index ea707ccd1..0e296d617 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -338,7 +338,7 @@ func (sm *SyncManager) syncWorker(id int) { } log.Info("sync worker go time!", ts.Height(), ts.Cids()) - ctx := context.WithValue(context.TODO(), syncStateKey, ss) + ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) err := sm.doSync(ctx, ts) if err != nil { log.Errorf("sync error: %+v", err)