From 5d34b7d618a0edb3bedf2d98bbb467cb454eb2e7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 14:11:19 +0200 Subject: [PATCH 01/26] rewrite sync manager --- chain/sync_manager.go | 563 +++++++++++++++++++------------------ chain/sync_manager_test.go | 11 +- 2 files changed, 298 insertions(+), 276 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c25068f60..8979c1e40 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -7,11 +7,13 @@ import ( "strings" "sync" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + peer "github.com/libp2p/go-libp2p-core/peer" ) -const BootstrapPeerThreshold = 2 +var BootstrapPeerThreshold = 2 var coalesceForksParents = false @@ -21,13 +23,6 @@ func init() { } } -const ( - BSStateInit = 0 - BSStateSelected = 1 - BSStateScheduled = 2 - BSStateComplete = 3 -) - type SyncFunc func(context.Context, *types.TipSet) error // SyncManager manages the chain synchronization process, both at bootstrap time @@ -52,108 +47,327 @@ type SyncManager interface { } type syncManager struct { - lk sync.Mutex - peerHeads map[peer.ID]*types.TipSet + ctx context.Context + cancel func() - bssLk sync.Mutex - bootstrapState int + workq chan peerHead + statusq chan workerStatus - bspThresh int + nextWorker uint64 + pend syncBucketSet + heads map[peer.ID]*types.TipSet - incomingTipSets chan *types.TipSet - syncTargets chan *types.TipSet - syncResults chan *syncResult + mx sync.Mutex + state map[uint64]*workerState - syncStates []*SyncerState - - // Normally this handler is set to `(*Syncer).Sync()`. doSync func(context.Context, *types.TipSet) error - - stop chan struct{} - - // Sync Scheduler fields - activeSyncs map[types.TipSetKey]*types.TipSet - syncQueue syncBucketSet - activeSyncTips syncBucketSet - nextSyncTarget *syncTargetBucket - workerChan chan *types.TipSet } var _ SyncManager = (*syncManager)(nil) -type syncResult struct { - ts *types.TipSet - success bool +type peerHead struct { + p peer.ID + ts *types.TipSet } -const syncWorkerCount = 3 +type workerState struct { + id uint64 + ts *types.TipSet + ss *SyncerState +} +type workerStatus struct { + id uint64 + err error +} + +// sync manager interface func NewSyncManager(sync SyncFunc) SyncManager { - sm := &syncManager{ - bspThresh: 1, - peerHeads: make(map[peer.ID]*types.TipSet), - syncTargets: make(chan *types.TipSet), - syncResults: make(chan *syncResult), - syncStates: make([]*SyncerState, syncWorkerCount), - incomingTipSets: make(chan *types.TipSet), - activeSyncs: make(map[types.TipSetKey]*types.TipSet), - doSync: sync, - stop: make(chan struct{}), + ctx, cancel := context.WithCancel(context.Background()) + return &syncManager{ + ctx: ctx, + cancel: cancel, + + workq: make(chan peerHead), + statusq: make(chan workerStatus), + + heads: make(map[peer.ID]*types.TipSet), + state: make(map[uint64]*workerState), + + doSync: sync, } - for i := range sm.syncStates { - sm.syncStates[i] = new(SyncerState) - } - return sm } func (sm *syncManager) Start() { - go sm.syncScheduler() - for i := 0; i < syncWorkerCount; i++ { - go sm.syncWorker(i) - } + go sm.scheduler() } func (sm *syncManager) Stop() { - close(sm.stop) + select { + case <-sm.ctx.Done(): + default: + sm.cancel() + } } func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { - sm.lk.Lock() - defer sm.lk.Unlock() - sm.peerHeads[p] = ts - - if sm.getBootstrapState() == BSStateInit { - spc := sm.syncedPeerCount() - if spc >= sm.bspThresh { - // Its go time! - target, err := sm.selectSyncTarget() - if err != nil { - log.Error("failed to select sync target: ", err) - return - } - sm.setBootstrapState(BSStateSelected) - - sm.incomingTipSets <- target - } - log.Infof("sync bootstrap has %d peers", spc) - return + select { + case sm.workq <- peerHead{p: p, ts: ts}: + case <-sm.ctx.Done(): + case <-ctx.Done(): } - - sm.incomingTipSets <- ts } func (sm *syncManager) State() []SyncerStateSnapshot { - ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates)) - for _, s := range sm.syncStates { - ret = append(ret, s.Snapshot()) + sm.mx.Lock() + workerStates := make([]*workerState, 0, len(sm.state)) + for _, ws := range sm.state { + workerStates = append(workerStates, ws) } - return ret + sm.mx.Unlock() + + sort.Slice(workerStates, func(i, j int) bool { + return workerStates[i].id < workerStates[j].id + }) + + result := make([]SyncerStateSnapshot, 0, len(workerStates)) + for _, ws := range workerStates { + result = append(result, ws.ss.Snapshot()) + } + + return result } +// sync manager internals +func (sm *syncManager) scheduler() { + for { + select { + case head := <-sm.workq: + sm.handlePeerHead(head) + case status := <-sm.statusq: + sm.handleWorkerStatus(status) + case <-sm.ctx.Done(): + return + } + } +} + +func (sm *syncManager) handlePeerHead(head peerHead) { + log.Infof("new peer head: %s %s", head.p, head.ts) + + // have we started syncing yet? + if sm.nextWorker == 0 { + // track the peer head until we start syncing + sm.heads[head.p] = head.ts + + // not yet; do we have enough peers? + if len(sm.heads) < BootstrapPeerThreshold { + // not enough peers; track it and wait + return + } + + // we are ready to start syncing; select the sync target and spawn a worker + target, err := sm.selectInitialSyncTarget() + if err != nil { + log.Errorf("failed to select initial sync target: %s", err) + return + } + + log.Infof("selected initial sync target: %s", target) + sm.spawnWorker(target) + return + } + + // we have started syncing, add peer head to the queue if applicable and maybe spawn a worker + // if there is work to do (possibly in a fork) + target, work, err := sm.addSyncTarget(head.ts) + if err != nil { + log.Warnf("failed to add sync target: %s", err) + return + } + + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) handleWorkerStatus(status workerStatus) { + log.Debugf("worker %d done; status error: %s", status.err) + + sm.mx.Lock() + ws := sm.state[status.id] + delete(sm.state, status.id) + sm.mx.Unlock() + + if status.err != nil { + // we failed to sync this target -- log it and try to work on an extended chain + // if there is nothing related to be worked on, we stop working on this chain. + log.Errorf("error during sync in %s: %s", ws.ts, status.err) + } + + // we are done with this target, select the next sync target and spawn a worker if there is work + // to do, because of an extension of this chain. + target, work, err := sm.selectSyncTarget(ws.ts) + if err != nil { + log.Warnf("failed to select sync target: %s", err) + return + } + + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) spawnWorker(target *types.TipSet) { + id := sm.nextWorker + sm.nextWorker++ + ws := &workerState{ + id: id, + ts: target, + ss: new(SyncerState), + } + + sm.mx.Lock() + sm.state[id] = ws + sm.mx.Unlock() + + go sm.worker(ws) +} + +func (sm *syncManager) worker(ws *workerState) { + log.Infof("worker %d syncing in %s", ws.id, ws.ss) + + start := build.Clock.Now() + defer func() { + log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start)) + }() + + ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss) + err := sm.doSync(ctx, ws.ts) + + select { + case sm.statusq <- workerStatus{id: ws.id, err: err}: + case <-sm.ctx.Done(): + } +} + +// selects the initial sync target by examining known peer heads; only called once for the initial +// sync. +func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) { + var buckets syncBucketSet + + var peerHeads []*types.TipSet + for _, ts := range sm.heads { + peerHeads = append(peerHeads, ts) + } + // clear the map, we don't use it any longer + sm.heads = nil + + sort.Slice(peerHeads, func(i, j int) bool { + return peerHeads[i].Height() < peerHeads[j].Height() + }) + + for _, ts := range peerHeads { + buckets.Insert(ts) + } + + if len(buckets.buckets) > 1 { + log.Warn("caution, multiple distinct chains seen during head selections") + // TODO: we *could* refuse to sync here without user intervention. + // For now, just select the best cluster + } + + return buckets.Heaviest(), nil +} + +// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on. +// this could be either a restart, eg because there is no currently scheduled sync work or a worker +// failed or a potential fork. +func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) { + // Note: we don't need the state lock here to access the active worker states, as the only + // competing threads that may access it do so through State() which is read only. + + // if the worker set is empty, we have finished syncing and were waiting for the next tipset + // in this case, we just return the tipset as work to be done + if len(sm.state) == 0 { + return ts, true, nil + } + + // check if it is related to any active sync; if so insert into the pending sync queue + for _, ws := range sm.state { + if ts.Equals(ws.ts) { + // ignore it, we are already syncing it + return nil, false, nil + } + + if ts.Parents() == ws.ts.Key() { + // schedule for syncing next; it's an extension of an active sync + sm.pend.Insert(ts) + return nil, false, nil + } + } + + // check to see if it is related to any pending sync; if so insert it into the pending sync queue + if sm.pend.RelatedToAny(ts) { + sm.pend.Insert(ts) + return nil, false, nil + } + + // it's not related to any active or pending sync; this could be a fork in which case we + // start a new worker to sync it, if it is *heavier* than any active or pending set; + // if it is not, we ignore it. + activeHeavier := false + for _, ws := range sm.state { + if ws.ts.Height() > ts.Height() { + activeHeavier = true + break + } + } + + if activeHeavier { + return nil, false, nil + } + + pendHeaviest := sm.pend.Heaviest() + if pendHeaviest != nil && pendHeaviest.Height() > ts.Height() { + return nil, false, nil + } + + // start a new worker, seems heavy enough and unrelated to active or pending syncs + return ts, true, nil +} + +// selects the next sync target after a worker sync has finished; returns true and a target +// TipSet if this chain should continue to sync because there is a heavier related tipset. +func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) { + // we pop the related bucket and if there is any related tipset, we work on the heaviest one next + // if we are not already working on a heavier tipset + related := sm.pend.PopRelated(done) + if related == nil { + return nil, false, nil + } + + heaviest := related.heaviestTipSet() + for _, ws := range sm.state { + if ws.ts.Height() > heaviest.Height() { + return nil, false, nil + } + } + + return heaviest, true, nil +} + +// sync buckets and related utilities type syncBucketSet struct { buckets []*syncTargetBucket } +type syncTargetBucket struct { + tips []*types.TipSet +} + func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { var stb syncTargetBucket for _, ts := range tipsets { @@ -250,10 +464,6 @@ func (sbs *syncBucketSet) Empty() bool { return len(sbs.buckets) == 0 } -type syncTargetBucket struct { - tips []*types.TipSet -} - func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { for _, t := range stb.tips { if ts.Equals(t) { @@ -296,196 +506,3 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { } return best } - -func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { - var buckets syncBucketSet - - var peerHeads []*types.TipSet - for _, ts := range sm.peerHeads { - peerHeads = append(peerHeads, ts) - } - sort.Slice(peerHeads, func(i, j int) bool { - return peerHeads[i].Height() < peerHeads[j].Height() - }) - - for _, ts := range peerHeads { - buckets.Insert(ts) - } - - if len(buckets.buckets) > 1 { - log.Warn("caution, multiple distinct chains seen during head selections") - // TODO: we *could* refuse to sync here without user intervention. - // For now, just select the best cluster - } - - return buckets.Heaviest(), nil -} - -func (sm *syncManager) syncScheduler() { - for { - select { - case ts, ok := <-sm.incomingTipSets: - if !ok { - log.Info("shutting down sync scheduler") - return - } - - sm.scheduleIncoming(ts) - case res := <-sm.syncResults: - sm.scheduleProcessResult(res) - case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): - sm.scheduleWorkSent() - case <-sm.stop: - log.Info("sync scheduler shutting down") - return - } - } -} - -func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { - log.Debug("scheduling incoming tipset sync: ", ts.Cids()) - if sm.getBootstrapState() == BSStateSelected { - sm.setBootstrapState(BSStateScheduled) - sm.syncTargets <- ts - return - } - - var relatedToActiveSync bool - for _, acts := range sm.activeSyncs { - if ts.Equals(acts) { - // ignore, we are already syncing it - return - } - - if ts.Parents() == acts.Key() { - // sync this next, after that sync process finishes - relatedToActiveSync = true - } - } - - if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) { - relatedToActiveSync = true - } - - // if this is related to an active sync process, immediately bucket it - // we don't want to start a parallel sync process that duplicates work - if relatedToActiveSync { - sm.activeSyncTips.Insert(ts) - return - } - - if sm.getBootstrapState() == BSStateScheduled { - sm.syncQueue.Insert(ts) - return - } - - if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { - sm.nextSyncTarget.add(ts) - } else { - sm.syncQueue.Insert(ts) - - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = sm.syncQueue.Pop() - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleProcessResult(res *syncResult) { - if res.success && sm.getBootstrapState() != BSStateComplete { - sm.setBootstrapState(BSStateComplete) - } - - delete(sm.activeSyncs, res.ts.Key()) - relbucket := sm.activeSyncTips.PopRelated(res.ts) - if relbucket != nil { - if res.success { - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbucket - sm.workerChan = sm.syncTargets - } else { - for _, t := range relbucket.tips { - sm.syncQueue.Insert(t) - } - } - return - } - // TODO: this is the case where we try to sync a chain, and - // fail, and we have more blocks on top of that chain that - // have come in since. The question is, should we try to - // sync these? or just drop them? - log.Error("failed to sync chain but have new unconnected blocks from chain") - } - - if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() { - next := sm.syncQueue.Pop() - if next != nil { - sm.nextSyncTarget = next - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleWorkSent() { - hts := sm.nextSyncTarget.heaviestTipSet() - sm.activeSyncs[hts.Key()] = hts - - if !sm.syncQueue.Empty() { - sm.nextSyncTarget = sm.syncQueue.Pop() - } else { - sm.nextSyncTarget = nil - sm.workerChan = nil - } -} - -func (sm *syncManager) syncWorker(id int) { - ss := sm.syncStates[id] - for { - select { - case ts, ok := <-sm.syncTargets: - if !ok { - log.Info("sync manager worker shutting down") - return - } - - ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) - err := sm.doSync(ctx, ts) - if err != nil { - log.Errorf("sync error: %+v", err) - } - - sm.syncResults <- &syncResult{ - ts: ts, - success: err == nil, - } - } - } -} - -func (sm *syncManager) syncedPeerCount() int { - var count int - for _, ts := range sm.peerHeads { - if ts.Height() > 0 { - count++ - } - } - return count -} - -func (sm *syncManager) getBootstrapState() int { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState -} - -func (sm *syncManager) setBootstrapState(v int) { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - sm.bootstrapState = v -} - -func (sm *syncManager) IsBootstrapped() bool { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState == BSStateComplete -} diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 709e03a41..74fb77dc3 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -28,7 +28,12 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, <-ch return nil }).(*syncManager) - sm.bspThresh = thresh + + oldBootstrapPeerThreshold := BootstrapPeerThreshold + BootstrapPeerThreshold = thresh + defer func() { + BootstrapPeerThreshold = oldBootstrapPeerThreshold + }() sm.Start() defer sm.Stop() @@ -112,8 +117,8 @@ func TestSyncManagerEdgeCase(t *testing.T) { waitUntilAllWorkersAreDone(stc) - if len(sm.activeSyncTips.buckets) != 0 { - t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) + if len(sm.state) != 0 { + t.Errorf("active syncs expected empty but got: %d", len(sm.state)) } }) } From af53b72eb8b86aaff72a54e5b8c14ff736e90b44 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 14:36:34 +0200 Subject: [PATCH 02/26] fix typo --- chain/sync_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 8979c1e40..682f16bcd 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -237,7 +237,7 @@ func (sm *syncManager) spawnWorker(target *types.TipSet) { } func (sm *syncManager) worker(ws *workerState) { - log.Infof("worker %d syncing in %s", ws.id, ws.ss) + log.Infof("worker %d syncing in %s", ws.id, ws.ts) start := build.Clock.Now() defer func() { From f26385d0c67deaae4996d2d6e7093464a77ec00a Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 14:51:26 +0200 Subject: [PATCH 03/26] use weight as the sync target selector --- chain/sync_manager.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 682f16bcd..b33bb4356 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -318,20 +318,14 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err // it's not related to any active or pending sync; this could be a fork in which case we // start a new worker to sync it, if it is *heavier* than any active or pending set; // if it is not, we ignore it. - activeHeavier := false for _, ws := range sm.state { - if ws.ts.Height() > ts.Height() { - activeHeavier = true - break + if isHeavier(ws.ts, ts) { + return nil, false, nil } } - if activeHeavier { - return nil, false, nil - } - pendHeaviest := sm.pend.Heaviest() - if pendHeaviest != nil && pendHeaviest.Height() > ts.Height() { + if pendHeaviest != nil && isHeavier(pendHeaviest, ts) { return nil, false, nil } @@ -351,7 +345,7 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool heaviest := related.heaviestTipSet() for _, ws := range sm.state { - if ws.ts.Height() > heaviest.Height() { + if isHeavier(ws.ts, heaviest) { return nil, false, nil } } @@ -359,6 +353,10 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool return heaviest, true, nil } +func isHeavier(a, b *types.TipSet) bool { + return a.ParentWeight().GreaterThan(b.ParentWeight()) +} + // sync buckets and related utilities type syncBucketSet struct { buckets []*syncTargetBucket From c0a039ee11098e128617ff93e2687869ff2e7319 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 15:19:09 +0200 Subject: [PATCH 04/26] fix logging --- chain/sync_manager.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index b33bb4356..a29331462 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -193,7 +193,7 @@ func (sm *syncManager) handlePeerHead(head peerHead) { } func (sm *syncManager) handleWorkerStatus(status workerStatus) { - log.Debugf("worker %d done; status error: %s", status.err) + log.Debugf("worker %d done; status error: %s", status.id, status.err) sm.mx.Lock() ws := sm.state[status.id] @@ -240,13 +240,11 @@ func (sm *syncManager) worker(ws *workerState) { log.Infof("worker %d syncing in %s", ws.id, ws.ts) start := build.Clock.Now() - defer func() { - log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start)) - }() ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss) err := sm.doSync(ctx, ws.ts) + log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start)) select { case sm.statusq <- workerStatus{id: ws.id, err: err}: case <-sm.ctx.Done(): From f05568197275d4a6500400e5cc8648b1099473ec Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 15:19:40 +0200 Subject: [PATCH 05/26] fix test --- chain/sync_manager_test.go | 64 ++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 74fb77dc3..d58dc5f3d 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -92,30 +92,52 @@ func TestSyncManagerEdgeCase(t *testing.T) { runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) - assertGetSyncOp(t, stc, a) sm.SetPeerHead(ctx, "peer1", b1) sm.SetPeerHead(ctx, "peer1", b2) - // b1 and b2 are being processed - b1op := <-stc - b2op := <-stc - if !b1op.ts.Equals(b1) { - b1op, b2op = b2op, b1op + assertGetSyncOp(t, stc, a) + + // b1 and b2 are in queue after a; the sync manager should pick the heaviest one which is b2 + bop := <-stc + if !bop.ts.Equals(b2) { + t.Fatalf("Expected tipset %s to sync, but got %s", b2, bop.ts) } - sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 - sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 - sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0 - sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0 + sm.SetPeerHead(ctx, "peer2", c2) + sm.SetPeerHead(ctx, "peer2", c1) + sm.SetPeerHead(ctx, "peer3", b2) + sm.SetPeerHead(ctx, "peer1", a) - b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 - // even though correct one is index 1 + bop.done() - b2op.done() - // b2 completes and is not related to c1, so it leaves activeSyncTips as it is + // get the next sync target; it should be c1 as the heaviest tipset but added last (same weight as c2) + bop = <-stc + if !bop.ts.Equals(c1) { + t.Fatalf("Expected tipset %s to sync, but got %s", c1, bop.ts) + } - waitUntilAllWorkersAreDone(stc) + sm.SetPeerHead(ctx, "peer4", d1) + sm.SetPeerHead(ctx, "peer5", e1) + bop.done() + + // get the last sync target; it should be e1 + var last *types.TipSet + for i := 0; i < 10; { + select { + case bop = <-stc: + bop.done() + if last == nil || bop.ts.Height() > last.Height() { + last = bop.ts + } + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } + if !last.Equals(e1) { + t.Fatalf("Expected tipset %s to sync, but got %s", e1, last) + } if len(sm.state) != 0 { t.Errorf("active syncs expected empty but got: %d", len(sm.state)) @@ -123,18 +145,6 @@ func TestSyncManagerEdgeCase(t *testing.T) { }) } -func waitUntilAllWorkersAreDone(stc chan *syncOp) { - for i := 0; i < 10; { - select { - case so := <-stc: - so.done() - default: - i++ - time.Sleep(10 * time.Millisecond) - } - } -} - func TestSyncManager(t *testing.T) { ctx := context.Background() From 2fd0d430c87d1a2c66abb6945d531e821d461c4f Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Oct 2020 15:49:24 +0200 Subject: [PATCH 06/26] set BootstrapPeerThreshold to 1, to mimic old code which incidentally fixes tests --- chain/sync_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index a29331462..0d005be2d 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -13,7 +13,7 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) -var BootstrapPeerThreshold = 2 +var BootstrapPeerThreshold = 1 var coalesceForksParents = false From 16d2cb6309591eeac44d3f546a3895531ecac008 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 27 Oct 2020 18:00:34 +0100 Subject: [PATCH 07/26] Add flag to coalesce Tipsets in pending queue Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 0d005be2d..c1ec4e12d 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -15,12 +15,10 @@ import ( var BootstrapPeerThreshold = 1 -var coalesceForksParents = false +var coalesceTipsets = false func init() { - if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" { - coalesceForksParents = true - } + coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes" } type SyncFunc func(context.Context, *types.TipSet) error @@ -471,19 +469,29 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { if ts.Parents() == t.Key() { return true } - if coalesceForksParents && ts.Parents() == t.Parents() { - return true - } } return false } func (stb *syncTargetBucket) add(ts *types.TipSet) { - - for _, t := range stb.tips { + for i, t := range stb.tips { if t.Equals(ts) { return } + if coalesceTipsets && t.Height() == ts.Height() && + types.CidArrsEqual(t.Blocks()[0].Parents, ts.Blocks()[0].Parents) { + + newTs := append([]*types.BlockHeader{}, ts.Blocks()...) + newTs = append(newTs, t.Blocks()...) + ts2, err := types.NewTipSet(newTs) + if err != nil { + log.Warnf("error while trying to recombine a tipset in a bucket: %+v", err) + continue + } + stb.tips[i] = ts2 + return + } + } stb.tips = append(stb.tips, ts) From 2b82e5a118fd2ae5ac35debf9aa5ca19631fc634 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 27 Oct 2020 21:23:16 +0100 Subject: [PATCH 08/26] Fix tipset coalescing in case of repeated blocks Signed-off-by: Jakub Sztandera --- chain/sync_manager.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c1ec4e12d..1fec71eed 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -7,6 +7,7 @@ import ( "strings" "sync" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" @@ -480,9 +481,23 @@ func (stb *syncTargetBucket) add(ts *types.TipSet) { } if coalesceTipsets && t.Height() == ts.Height() && types.CidArrsEqual(t.Blocks()[0].Parents, ts.Blocks()[0].Parents) { + miners := make(map[address.Address]struct{}) + newTs := []*types.BlockHeader{} + for _, b := range t.Blocks() { + _, have := miners[b.Miner] + if !have { + newTs = append(newTs, b) + miners[b.Miner] = struct{}{} + } + } + for _, b := range ts.Blocks() { + _, have := miners[b.Miner] + if !have { + newTs = append(newTs, b) + miners[b.Miner] = struct{}{} + } + } - newTs := append([]*types.BlockHeader{}, ts.Blocks()...) - newTs = append(newTs, t.Blocks()...) ts2, err := types.NewTipSet(newTs) if err != nil { log.Warnf("error while trying to recombine a tipset in a bucket: %+v", err) From ab7a66b90dcfbbf04c58549e3a8ef166e712130d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Oct 2020 23:48:31 +0200 Subject: [PATCH 09/26] track recently synced tipsets to avoid unnecessary worker spawning --- chain/sync_manager.go | 62 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 1fec71eed..d3a9339d7 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -16,6 +16,8 @@ import ( var BootstrapPeerThreshold = 1 +var RecentSyncBufferSize = 10 + var coalesceTipsets = false func init() { @@ -55,6 +57,7 @@ type syncManager struct { nextWorker uint64 pend syncBucketSet heads map[peer.ID]*types.TipSet + recent *syncBuffer mx sync.Mutex state map[uint64]*workerState @@ -90,8 +93,9 @@ func NewSyncManager(sync SyncFunc) SyncManager { workq: make(chan peerHead), statusq: make(chan workerStatus), - heads: make(map[peer.ID]*types.TipSet), - state: make(map[uint64]*workerState), + heads: make(map[peer.ID]*types.TipSet), + state: make(map[uint64]*workerState), + recent: newSyncBuffer(RecentSyncBufferSize), doSync: sync, } @@ -203,6 +207,9 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { // we failed to sync this target -- log it and try to work on an extended chain // if there is nothing related to be worked on, we stop working on this chain. log.Errorf("error during sync in %s: %s", ws.ts, status.err) + } else { + // add to the recently synced buffer + sm.recent.Push(ws.ts) } // we are done with this target, select the next sync target and spawn a worker if there is work @@ -286,6 +293,12 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err // Note: we don't need the state lock here to access the active worker states, as the only // competing threads that may access it do so through State() which is read only. + // if we have recently synced this or any heavier tipset we just ignore it; this can happen + // with an empty worker set after we just finished syncing to a target + if sm.recent.Synced(ts) { + return nil, false, nil + } + // if the worker set is empty, we have finished syncing and were waiting for the next tipset // in this case, we just return the tipset as work to be done if len(sm.state) == 0 { @@ -341,6 +354,10 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool } heaviest := related.heaviestTipSet() + if isHeavier(done, heaviest) { + return nil, false, nil + } + for _, ws := range sm.state { if isHeavier(ws.ts, heaviest) { return nil, false, nil @@ -354,6 +371,47 @@ func isHeavier(a, b *types.TipSet) bool { return a.ParentWeight().GreaterThan(b.ParentWeight()) } +// sync buffer -- this is a circular buffer of recently synced tipsets +type syncBuffer struct { + buf []*types.TipSet + next int64 +} + +func newSyncBuffer(size int) *syncBuffer { + return &syncBuffer{buf: make([]*types.TipSet, size)} +} + +func (sb *syncBuffer) Push(ts *types.TipSet) { + i := int(sb.next % int64(len(sb.buf))) + sb.buf[i] = ts + sb.next++ +} + +func (sb *syncBuffer) Synced(ts *types.TipSet) bool { + synced := func(a, b *types.TipSet) bool { + return a.Equals(b) || isHeavier(a, b) + } + + if sb.next < int64(len(sb.buf)) { + for i := int(sb.next - 1); i >= 0; i-- { + if synced(sb.buf[i], ts) { + return true + } + } + + return false + } + + for j := 1; j < len(sb.buf); j++ { + i := int((sb.next - int64(j)) % int64(len(sb.buf))) + if synced(sb.buf[i], ts) { + return true + } + } + + return false +} + // sync buckets and related utilities type syncBucketSet struct { buckets []*syncTargetBucket From fc1ac3e752948d43bdbab0870d2715812551b4b0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 00:02:08 +0200 Subject: [PATCH 10/26] simplify circular buffer code we don't care about order of checks! --- chain/sync_manager.go | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index d3a9339d7..982598ae9 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -374,7 +374,7 @@ func isHeavier(a, b *types.TipSet) bool { // sync buffer -- this is a circular buffer of recently synced tipsets type syncBuffer struct { buf []*types.TipSet - next int64 + next int } func newSyncBuffer(size int) *syncBuffer { @@ -382,29 +382,14 @@ func newSyncBuffer(size int) *syncBuffer { } func (sb *syncBuffer) Push(ts *types.TipSet) { - i := int(sb.next % int64(len(sb.buf))) - sb.buf[i] = ts + sb.buf[sb.next] = ts sb.next++ + sb.next %= len(sb.buf) } func (sb *syncBuffer) Synced(ts *types.TipSet) bool { - synced := func(a, b *types.TipSet) bool { - return a.Equals(b) || isHeavier(a, b) - } - - if sb.next < int64(len(sb.buf)) { - for i := int(sb.next - 1); i >= 0; i-- { - if synced(sb.buf[i], ts) { - return true - } - } - - return false - } - - for j := 1; j < len(sb.buf); j++ { - i := int((sb.next - int64(j)) % int64(len(sb.buf))) - if synced(sb.buf[i], ts) { + for _, rts := range sb.buf { + if rts != nil && (rts.Equals(ts) || isHeavier(rts, ts)) { return true } } From 469666de825de173053bd6b6094ea90965ba6efd Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 10:33:22 +0200 Subject: [PATCH 11/26] limit max active sync workers --- chain/sync_manager.go | 58 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 982598ae9..bf777a647 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -14,11 +14,14 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) -var BootstrapPeerThreshold = 1 +var ( + BootstrapPeerThreshold = 1 -var RecentSyncBufferSize = 10 + RecentSyncBufferSize = 10 + MaxSyncWorkers = 5 -var coalesceTipsets = false + coalesceTipsets = false +) func init() { coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes" @@ -56,6 +59,7 @@ type syncManager struct { nextWorker uint64 pend syncBucketSet + deferred syncBucketSet heads map[peer.ID]*types.TipSet recent *syncBuffer @@ -339,6 +343,13 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err return nil, false, nil } + // if we have too many workers, add it to the deferred queue; it will be processed once a worker + // is freed from syncing a chain + if len(sm.state) >= MaxSyncWorkers { + log.Infof("too many sync workers; deferring sync on %s", ts) + sm.deferred.Insert(ts) + } + // start a new worker, seems heavy enough and unrelated to active or pending syncs return ts, true, nil } @@ -350,23 +361,58 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool // if we are not already working on a heavier tipset related := sm.pend.PopRelated(done) if related == nil { - return nil, false, nil + return sm.selectDeferredSyncTarget() } heaviest := related.heaviestTipSet() if isHeavier(done, heaviest) { - return nil, false, nil + return sm.selectDeferredSyncTarget() } for _, ws := range sm.state { if isHeavier(ws.ts, heaviest) { - return nil, false, nil + return sm.selectDeferredSyncTarget() } } return heaviest, true, nil } +// selects a deferred sync target if there is any; these are sync targets that were not related to +// active syncs and were deferred because there were too many workers running +func (sm *syncManager) selectDeferredSyncTarget() (*types.TipSet, bool, error) { +deferredLoop: + for !sm.deferred.Empty() { + bucket := sm.deferred.Pop() + heaviest := bucket.heaviestTipSet() + + if sm.pend.RelatedToAny(heaviest) { + // this has converged to a pending sync, insert it to the pending queue + sm.pend.Insert(heaviest) + continue deferredLoop + } + + for _, ws := range sm.state { + if ws.ts.Equals(heaviest) || isHeavier(ws.ts, heaviest) { + // we have converged and are already syncing it or we are syncing on something heavier + // ignore it and pop the next deferred bucket + continue deferredLoop + } + + if heaviest.Parents() == ws.ts.Key() { + // we have converged and we are syncing its parent; insert it to the pending queue + sm.pend.Insert(heaviest) + continue deferredLoop + } + + // it's not related to any active or pending sync and this worker is free, so sync it! + return heaviest, true, nil + } + } + + return nil, false, nil +} + func isHeavier(a, b *types.TipSet) bool { return a.ParentWeight().GreaterThan(b.ParentWeight()) } From 9ddf7bbd15a9c238e649114873e2e1f2d20faf13 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 11:36:34 +0200 Subject: [PATCH 12/26] better handling of initial sync --- chain/sync_manager.go | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index bf777a647..f21b2fcb8 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/build" @@ -63,6 +64,8 @@ type syncManager struct { heads map[peer.ID]*types.TipSet recent *syncBuffer + initialSync bool + mx sync.Mutex state map[uint64]*workerState @@ -147,12 +150,19 @@ func (sm *syncManager) State() []SyncerStateSnapshot { // sync manager internals func (sm *syncManager) scheduler() { + ticker := time.NewTicker(time.Minute) + tickerC := ticker.C for { select { case head := <-sm.workq: sm.handlePeerHead(head) case status := <-sm.statusq: sm.handleWorkerStatus(status) + case <-tickerC: + if sm.initialSync { + tickerC = nil + sm.handleInitialSync() + } case <-sm.ctx.Done(): return } @@ -214,6 +224,8 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { } else { // add to the recently synced buffer sm.recent.Push(ws.ts) + // mark the end of the initial sync + sm.initialSync = true } // we are done with this target, select the next sync target and spawn a worker if there is work @@ -230,6 +242,24 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { } } +func (sm *syncManager) handleInitialSync() { + // we have just finished the initial sync; spawn some additional workers in deferred syncs + // as needed (and up to MaxSyncWorkers) to ramp up chain sync + for len(sm.state) < MaxSyncWorkers { + target, work, err := sm.selectDeferredSyncTarget() + if err != nil { + log.Errorf("error selecting deferred sync target: %s", err) + return + } + + if !work { + return + } + + sm.spawnWorker(target) + } +} + func (sm *syncManager) spawnWorker(target *types.TipSet) { id := sm.nextWorker sm.nextWorker++ @@ -343,10 +373,10 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err return nil, false, nil } - // if we have too many workers, add it to the deferred queue; it will be processed once a worker - // is freed from syncing a chain - if len(sm.state) >= MaxSyncWorkers { - log.Infof("too many sync workers; deferring sync on %s", ts) + // if we have not finished the initial sync or have too many workers, add it to the deferred queue; + // it will be processed once a worker is freed from syncing a chain (or the initial sync finishes) + if !sm.initialSync || len(sm.state) >= MaxSyncWorkers { + log.Infof("deferring sync on %s", ts) sm.deferred.Insert(ts) } From 188d1649d3b34dd770fdd1dfed1454452032d10c Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 11:41:40 +0200 Subject: [PATCH 13/26] stop the ticker when done with it --- chain/sync_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index f21b2fcb8..4387cc603 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -160,6 +160,7 @@ func (sm *syncManager) scheduler() { sm.handleWorkerStatus(status) case <-tickerC: if sm.initialSync { + ticker.Stop() tickerC = nil sm.handleInitialSync() } From 6266bae1d3f7ed8aaa20a1105a0a3120a9982eda Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 11:44:16 +0200 Subject: [PATCH 14/26] log worker spawning from initial sync deferrals --- chain/sync_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 4387cc603..febcc5349 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -257,6 +257,7 @@ func (sm *syncManager) handleInitialSync() { return } + log.Infof("selected deferred sync target: %s", target) sm.spawnWorker(target) } } From ba2512655ede2c27863cbae016be908d597e3b94 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 28 Oct 2020 14:08:06 +0200 Subject: [PATCH 15/26] track last few worker states for debug purposes --- chain/sync_manager.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index febcc5349..d6c7756d6 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -20,6 +20,7 @@ var ( RecentSyncBufferSize = 10 MaxSyncWorkers = 5 + SyncWorkerHistory = 3 coalesceTipsets = false ) @@ -69,6 +70,9 @@ type syncManager struct { mx sync.Mutex state map[uint64]*workerState + history []*workerState + historyI int + doSync func(context.Context, *types.TipSet) error } @@ -100,9 +104,10 @@ func NewSyncManager(sync SyncFunc) SyncManager { workq: make(chan peerHead), statusq: make(chan workerStatus), - heads: make(map[peer.ID]*types.TipSet), - state: make(map[uint64]*workerState), - recent: newSyncBuffer(RecentSyncBufferSize), + heads: make(map[peer.ID]*types.TipSet), + state: make(map[uint64]*workerState), + recent: newSyncBuffer(RecentSyncBufferSize), + history: make([]*workerState, SyncWorkerHistory), doSync: sync, } @@ -134,6 +139,11 @@ func (sm *syncManager) State() []SyncerStateSnapshot { for _, ws := range sm.state { workerStates = append(workerStates, ws) } + for _, ws := range sm.history { + if ws != nil { + workerStates = append(workerStates, ws) + } + } sm.mx.Unlock() sort.Slice(workerStates, func(i, j int) bool { @@ -216,6 +226,11 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { sm.mx.Lock() ws := sm.state[status.id] delete(sm.state, status.id) + + // we track the last few workers for debug purposes + sm.history[sm.historyI] = ws + sm.historyI++ + sm.historyI %= len(sm.history) sm.mx.Unlock() if status.err != nil { From e575b5fe8ac16aba664e3de089ce37da9efd9bef Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 28 Oct 2020 16:53:13 +0100 Subject: [PATCH 16/26] Add error if weights are the same Signed-off-by: Jakub Sztandera --- chain/store/store.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500e..38f569120 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -358,6 +358,8 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS // difference between 'bootstrap sync' and 'caught up' sync, we need // some other heuristic. return cs.takeHeaviestTipSet(ctx, ts) + } else if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) { + log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts) } return nil } From ad905fc3100cdac8ce57a882f1ee78502aed798d Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 28 Oct 2020 20:22:07 +0100 Subject: [PATCH 17/26] Expose WorkerID Signed-off-by: Jakub Sztandera --- api/api_full.go | 5 +++-- chain/sync_manager.go | 3 ++- chain/syncstate.go | 15 ++++++++------- cli/sync.go | 4 ++-- node/impl/full/sync.go | 15 ++++++++------- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index e2025f581..219dc271c 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -781,8 +781,9 @@ type IpldObject struct { } type ActiveSync struct { - Base *types.TipSet - Target *types.TipSet + WorkerID uint64 + Base *types.TipSet + Target *types.TipSet Stage SyncStateStage Height abi.ChainEpoch diff --git a/chain/sync_manager.go b/chain/sync_manager.go index d6c7756d6..800fbfd6e 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -135,7 +135,7 @@ func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip func (sm *syncManager) State() []SyncerStateSnapshot { sm.mx.Lock() - workerStates := make([]*workerState, 0, len(sm.state)) + workerStates := make([]*workerState, 0, len(sm.state)+len(sm.history)) for _, ws := range sm.state { workerStates = append(workerStates, ws) } @@ -285,6 +285,7 @@ func (sm *syncManager) spawnWorker(target *types.TipSet) { ts: target, ss: new(SyncerState), } + ws.ss.data.WorkerID = id sm.mx.Lock() sm.state[id] = ws diff --git a/chain/syncstate.go b/chain/syncstate.go index 26f9f1c39..527d6be48 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -12,13 +12,14 @@ import ( ) type SyncerStateSnapshot struct { - Target *types.TipSet - Base *types.TipSet - Stage api.SyncStateStage - Height abi.ChainEpoch - Message string - Start time.Time - End time.Time + WorkerID uint64 + Target *types.TipSet + Base *types.TipSet + Stage api.SyncStateStage + Height abi.ChainEpoch + Message string + Start time.Time + End time.Time } type SyncerState struct { diff --git a/cli/sync.go b/cli/sync.go index c3f25eb1d..b4e26f7d8 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -45,8 +45,8 @@ var syncStatusCmd = &cli.Command{ } fmt.Println("sync status:") - for i, ss := range state.ActiveSyncs { - fmt.Printf("worker %d:\n", i) + for _, ss := range state.ActiveSyncs { + fmt.Printf("worker %d:\n", ss.WorkerID) var base, target []cid.Cid var heightDiff int64 var theight abi.ChainEpoch diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index 05d4c9cb7..1a088fb77 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -37,13 +37,14 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { for i := range states { ss := &states[i] out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, - Start: ss.Start, - End: ss.End, - Message: ss.Message, + WorkerID: ss.WorkerID, + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + Start: ss.Start, + End: ss.End, + Message: ss.Message, }) } return out, nil From a26420f6e8aeb6b522b669c68d1a75440c6e3262 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 09:40:41 +0200 Subject: [PATCH 18/26] add missing return for deferrals during initial sync --- chain/sync_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 800fbfd6e..c681c8c5b 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -396,6 +396,7 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err if !sm.initialSync || len(sm.state) >= MaxSyncWorkers { log.Infof("deferring sync on %s", ts) sm.deferred.Insert(ts) + return nil, false, nil } // start a new worker, seems heavy enough and unrelated to active or pending syncs From 8702ff8b7ea50af5fa1288e3da1fccaf6ccdfac6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 17:12:47 +0200 Subject: [PATCH 19/26] rename initialSync to initialSyncDone --- chain/sync_manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c681c8c5b..b80d4d259 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -65,7 +65,7 @@ type syncManager struct { heads map[peer.ID]*types.TipSet recent *syncBuffer - initialSync bool + initialSyncDone bool mx sync.Mutex state map[uint64]*workerState @@ -169,10 +169,10 @@ func (sm *syncManager) scheduler() { case status := <-sm.statusq: sm.handleWorkerStatus(status) case <-tickerC: - if sm.initialSync { + if sm.initialSyncDone { ticker.Stop() tickerC = nil - sm.handleInitialSync() + sm.handleInitialSyncDone() } case <-sm.ctx.Done(): return @@ -241,7 +241,7 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { // add to the recently synced buffer sm.recent.Push(ws.ts) // mark the end of the initial sync - sm.initialSync = true + sm.initialSyncDone = true } // we are done with this target, select the next sync target and spawn a worker if there is work @@ -258,7 +258,7 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { } } -func (sm *syncManager) handleInitialSync() { +func (sm *syncManager) handleInitialSyncDone() { // we have just finished the initial sync; spawn some additional workers in deferred syncs // as needed (and up to MaxSyncWorkers) to ramp up chain sync for len(sm.state) < MaxSyncWorkers { @@ -393,7 +393,7 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err // if we have not finished the initial sync or have too many workers, add it to the deferred queue; // it will be processed once a worker is freed from syncing a chain (or the initial sync finishes) - if !sm.initialSync || len(sm.state) >= MaxSyncWorkers { + if !sm.initialSyncDone || len(sm.state) >= MaxSyncWorkers { log.Infof("deferring sync on %s", ts) sm.deferred.Insert(ts) return nil, false, nil From a6506418136129045cd3585f03fa8d795a4f4d84 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 17:27:13 +0200 Subject: [PATCH 20/26] transition out of initial sync only if sync finishes within 15min --- chain/sync_manager.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index b80d4d259..81c0f5e9a 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -22,6 +22,8 @@ var ( MaxSyncWorkers = 5 SyncWorkerHistory = 3 + InitialSyncTimeThreshold = 15 * time.Minute + coalesceTipsets = false ) @@ -87,6 +89,7 @@ type workerState struct { id uint64 ts *types.TipSet ss *SyncerState + dt time.Duration } type workerStatus struct { @@ -240,8 +243,10 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { } else { // add to the recently synced buffer sm.recent.Push(ws.ts) - // mark the end of the initial sync - sm.initialSyncDone = true + // if we are still in intial sync and this was fast enough, mark the end of the initial sync + if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold { + sm.initialSyncDone = true + } } // we are done with this target, select the next sync target and spawn a worker if there is work @@ -302,7 +307,8 @@ func (sm *syncManager) worker(ws *workerState) { ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss) err := sm.doSync(ctx, ws.ts) - log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start)) + ws.dt = build.Clock.Since(start) + log.Infof("worker %d done; took %s", ws.id, ws.dt) select { case sm.statusq <- workerStatus{id: ws.id, err: err}: case <-sm.ctx.Done(): From 47830efbf5061da61419ac9cffafd661a37cf789 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 19:08:58 +0200 Subject: [PATCH 21/26] set BootstrapPeerThreshold to 4, add env variable to override --- chain/sync_manager.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 81c0f5e9a..8af4c9d5e 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -4,6 +4,7 @@ import ( "context" "os" "sort" + "strconv" "strings" "sync" "time" @@ -16,7 +17,7 @@ import ( ) var ( - BootstrapPeerThreshold = 1 + BootstrapPeerThreshold = 4 RecentSyncBufferSize = 10 MaxSyncWorkers = 5 @@ -29,6 +30,15 @@ var ( func init() { coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes" + + if bootstrapPeerThreshold := os.Getenv("LOTUS_SYNC_BOOTSTRAP_PEERS"); bootstrapPeerThreshold != "" { + threshold, err := strconv.Atoi(bootstrapPeerThreshold) + if err != nil { + log.Errorf("failed to parse 'LOTUS_SYNC_BOOTSTRAP_PEERS' env var: %s", err) + } else { + BootstrapPeerThreshold = threshold + } + } } type SyncFunc func(context.Context, *types.TipSet) error From 57234d8ef371323cc088531b6fa537c0dee4872a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 19:13:45 +0200 Subject: [PATCH 22/26] fix spelling in comment to satisfy linter this is ridiculous; since when do linters apply spellchecking on comments??? --- chain/sync_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 8af4c9d5e..e8eb031c9 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -253,7 +253,7 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) { } else { // add to the recently synced buffer sm.recent.Push(ws.ts) - // if we are still in intial sync and this was fast enough, mark the end of the initial sync + // if we are still in initial sync and this was fast enough, mark the end of the initial sync if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold { sm.initialSyncDone = true } From 8d25fd39cfa40641d9532e715c40f27b51f2409e Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 29 Oct 2020 19:32:09 +0200 Subject: [PATCH 23/26] set BootstrapPeerThreshold to 1 for tests --- chain/sync_manager_test.go | 4 ++++ node/test/builder.go | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index d58dc5f3d..61985b964 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -10,6 +10,10 @@ import ( "github.com/filecoin-project/lotus/chain/types/mock" ) +func init() { + BootstrapPeerThreshold = 1 +} + var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0)) type syncOp struct { diff --git a/node/test/builder.go b/node/test/builder.go index ea9a82220..46efb9074 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/test" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" @@ -50,6 +51,10 @@ import ( "github.com/stretchr/testify/require" ) +func init() { + chain.BootstrapPeerThreshold = 1 +} + func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode { r := repo.NewMemory(nil) From f6ecff0e2632cb20ea0912ff46f1f4eb21c39dff Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 30 Oct 2020 07:50:26 +0200 Subject: [PATCH 24/26] check recent syncs in selectSyncTarget/selectDeferredSyncTarget --- chain/sync_manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index e8eb031c9..641226e0f 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -440,6 +440,10 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool } } + if sm.recent.Synced(heaviest) { + return sm.selectDeferredSyncTarget() + } + return heaviest, true, nil } @@ -451,6 +455,11 @@ deferredLoop: bucket := sm.deferred.Pop() heaviest := bucket.heaviestTipSet() + if sm.recent.Synced(heaviest) { + // we have synced it or something heavier recently, skip it + continue deferredLoop + } + if sm.pend.RelatedToAny(heaviest) { // this has converged to a pending sync, insert it to the pending queue sm.pend.Insert(heaviest) From 5dcf339840a886989cbb83817f92f4c45b869095 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 30 Oct 2020 19:28:32 +0200 Subject: [PATCH 25/26] fix sync wait cli --- cli/sync.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cli/sync.go b/cli/sync.go index b4e26f7d8..7527493d4 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -263,6 +263,11 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { return err } + if len(state.ActiveSyncs) == 0 { + time.Sleep(time.Second) + continue + } + head, err := napi.ChainHead(ctx) if err != nil { return err @@ -280,6 +285,7 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { } ss := state.ActiveSyncs[working] + workerID := ss.WorkerID var baseHeight abi.ChainEpoch var target []cid.Cid @@ -302,7 +308,7 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { fmt.Print("\r\x1b[2K\x1b[A") } - fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff) + fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", workerID, baseHeight, theight, heightDiff) fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height) lastLines = 2 From d6e2c80608179fa7abc488889fee1478fc0d1493 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 30 Oct 2020 19:45:59 +0200 Subject: [PATCH 26/26] if noone is working pick the last worker in sync wait --- cli/sync.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cli/sync.go b/cli/sync.go index 7527493d4..ff7d4bd65 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -273,7 +273,7 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { return err } - working := 0 + working := -1 for i, ss := range state.ActiveSyncs { switch ss.Stage { case api.StageSyncComplete: @@ -284,6 +284,10 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { } } + if working == -1 { + working = len(state.ActiveSyncs) - 1 + } + ss := state.ActiveSyncs[working] workerID := ss.WorkerID