diff --git a/api/api_full.go b/api/api_full.go index f1dc51efc..dba59ad31 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -253,7 +253,7 @@ type ReplayResults struct { Error string } -type SyncState struct { +type ActiveSync struct { Base *types.TipSet Target *types.TipSet @@ -261,6 +261,10 @@ type SyncState struct { Height uint64 } +type SyncState struct { + ActiveSyncs []ActiveSync +} + type SyncStateStage int const ( diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 6a770f4b1..494b2ec61 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -77,6 +77,14 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) var oerr error for _, p := range peers { + // TODO: doing this synchronously isnt great, but fetching in parallel + // may not be a good idea either. think about this more + select { + case <-ctx.Done(): + return nil, xerrors.Errorf("blocksync getblocks failed: %w", ctx.Err()) + default: + } + res, err := bs.sendRequestToPeer(ctx, p, req) if err != nil { oerr = err diff --git a/chain/sync.go b/chain/sync.go index 316e2e0a7..aff1e6cee 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -47,8 +47,6 @@ type Syncer struct { // The known Genesis tipset Genesis *types.TipSet - syncLock sync.Mutex - // TipSets known to be invalid bad *BadBlockCache @@ -57,12 +55,9 @@ type Syncer struct { self peer.ID - syncState SyncerState + syncLock sync.Mutex - // peer heads - // Note: clear cache on disconnects - peerHeads map[peer.ID]*types.TipSet - peerHeadsLk sync.Mutex + syncmgr *SyncManager } func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { @@ -76,18 +71,26 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) return nil, err } - return &Syncer{ - bad: NewBadBlockCache(), - Genesis: gent, - Bsync: bsync, - peerHeads: make(map[peer.ID]*types.TipSet), - store: sm.ChainStore(), - sm: sm, - self: self, - }, nil + s := &Syncer{ + bad: NewBadBlockCache(), + Genesis: gent, + Bsync: bsync, + store: sm.ChainStore(), + sm: sm, + self: self, + } + + s.syncmgr = NewSyncManager(s.Sync) + return s, nil } -const BootstrapPeerThreshold = 1 +func (syncer *Syncer) Start() { + syncer.syncmgr.Start() +} + +func (syncer *Syncer) Stop() { + syncer.syncmgr.Stop() +} // InformNewHead informs the syncer about a new potential tipset // This should be called when connecting to new peers, and additionally @@ -124,9 +127,6 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { return } - syncer.peerHeadsLk.Lock() - syncer.peerHeads[from] = fts.TipSet() - syncer.peerHeadsLk.Unlock() syncer.Bsync.AddPeer(from) bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight @@ -136,11 +136,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { return } - go func() { - if err := syncer.Sync(ctx, fts.TipSet()); err != nil { - log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) - } - }() + syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet()) } func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { @@ -389,6 +385,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() + if span.IsRecordingEvents() { span.AddAttributes( trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())), @@ -396,9 +393,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ) } - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) { return nil } @@ -759,9 +753,20 @@ func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig types.Signatur return nil } +type syncStateKey struct{} + +func extractSyncState(ctx context.Context) *SyncerState { + v := ctx.Value(syncStateKey{}) + if v != nil { + return v.(*SyncerState) + } + return nil +} + func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() + ss := extractSyncState(ctx) span.AddAttributes( trace.Int64Attribute("fromHeight", int64(from.Height())), @@ -784,7 +789,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to // we want to sync all the blocks until the height above the block we have untilHeight := to.Height() + 1 - syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height()) + ss.SetHeight(blockSet[len(blockSet)-1].Height()) var acceptedBlocks []cid.Cid @@ -852,7 +857,7 @@ loop: acceptedBlocks = append(acceptedBlocks, at...) - syncer.syncState.SetHeight(blks[len(blks)-1].Height()) + ss.SetHeight(blks[len(blks)-1].Height()) at = blks[len(blks)-1].Parents() } @@ -915,7 +920,8 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type } func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { - syncer.syncState.SetHeight(0) + ss := extractSyncState(ctx) + ss.SetHeight(0) return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error { log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids())) @@ -924,7 +930,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* return xerrors.Errorf("message processing failed: %w", err) } - syncer.syncState.SetHeight(fts.TipSet().Height()) + ss.SetHeight(fts.TipSet().Height()) return nil }) @@ -1019,8 +1025,9 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "collectChain") defer span.End() + ss := extractSyncState(ctx) - syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) + ss.Init(syncer.store.GetHeaviestTipSet(), ts) headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet()) if err != nil { @@ -1033,7 +1040,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids()) } - syncer.syncState.SetStage(api.StagePersistHeaders) + ss.SetStage(api.StagePersistHeaders) toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch) for _, ts := range headers { @@ -1044,13 +1051,13 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error } toPersist = nil - syncer.syncState.SetStage(api.StageMessages) + ss.SetStage(api.StageMessages) if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil { return xerrors.Errorf("collectChain syncMessages: %w", err) } - syncer.syncState.SetStage(api.StageSyncComplete) + ss.SetStage(api.StageSyncComplete) log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids())) return nil @@ -1069,6 +1076,10 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker return nil } -func (syncer *Syncer) State() SyncerState { - return syncer.syncState.Snapshot() +func (syncer *Syncer) State() []SyncerState { + var out []SyncerState + for _, ss := range syncer.syncmgr.syncStates { + out = append(out, ss.Snapshot()) + } + return out } diff --git a/chain/sync_manager.go b/chain/sync_manager.go new file mode 100644 index 000000000..0e296d617 --- /dev/null +++ b/chain/sync_manager.go @@ -0,0 +1,369 @@ +package chain + +import ( + "context" + "sort" + "sync" + + "github.com/filecoin-project/lotus/chain/types" + peer "github.com/libp2p/go-libp2p-peer" +) + +const BootstrapPeerThreshold = 2 + +type SyncFunc func(context.Context, *types.TipSet) error + +type SyncManager struct { + lk sync.Mutex + peerHeads map[peer.ID]*types.TipSet + bootstrapped bool + + bspThresh int + + incomingTipSets chan *types.TipSet + syncTargets chan *types.TipSet + syncResults chan *syncResult + + syncStates []*SyncerState + + doSync func(context.Context, *types.TipSet) error + + stop chan struct{} + + // Sync Scheduler fields + activeSyncs map[types.TipSetKey]*types.TipSet + syncQueue syncBucketSet + activeSyncTips syncBucketSet + nextSyncTarget *syncTargetBucket + workerChan chan *types.TipSet +} + +type syncResult struct { + ts *types.TipSet + success bool +} + +const syncWorkerCount = 3 + +func NewSyncManager(sync SyncFunc) *SyncManager { + return &SyncManager{ + bspThresh: 1, + peerHeads: make(map[peer.ID]*types.TipSet), + syncTargets: make(chan *types.TipSet), + syncResults: make(chan *syncResult), + syncStates: make([]*SyncerState, syncWorkerCount), + incomingTipSets: make(chan *types.TipSet), + activeSyncs: make(map[types.TipSetKey]*types.TipSet), + doSync: sync, + stop: make(chan struct{}), + } +} + +func (sm *SyncManager) Start() { + go sm.syncScheduler() + for i := 0; i < syncWorkerCount; i++ { + go sm.syncWorker(i) + } +} + +func (sm *SyncManager) Stop() { + close(sm.stop) +} + +func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { + log.Info("set peer head!", ts.Height(), ts.Cids()) + sm.lk.Lock() + defer sm.lk.Unlock() + sm.peerHeads[p] = ts + + if !sm.bootstrapped { + spc := sm.syncedPeerCount() + if spc >= sm.bspThresh { + // Its go time! + target, err := sm.selectSyncTarget() + if err != nil { + log.Error("failed to select sync target: ", err) + return + } + + sm.incomingTipSets <- target + // TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes + sm.bootstrapped = true + } + log.Infof("sync bootstrap has %d peers", spc) + return + } + + sm.incomingTipSets <- ts +} + +type syncBucketSet struct { + buckets []*syncTargetBucket +} + +func (sbs *syncBucketSet) Insert(ts *types.TipSet) { + for _, b := range sbs.buckets { + if b.sameChainAs(ts) { + b.add(ts) + return + } + } + sbs.buckets = append(sbs.buckets, &syncTargetBucket{ + tips: []*types.TipSet{ts}, + count: 1, + }) +} + +func (sbs *syncBucketSet) Pop() *syncTargetBucket { + var bestBuck *syncTargetBucket + var bestTs *types.TipSet + for _, b := range sbs.buckets { + hts := b.heaviestTipSet() + if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) { + bestBuck = b + bestTs = hts + } + } + + sbs.removeBucket(bestBuck) + + return bestBuck +} + +func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) { + nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1) + for _, b := range sbs.buckets { + if b != toremove { + nbuckets = append(nbuckets, b) + } + } + sbs.buckets = nbuckets +} + +func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket { + for _, b := range sbs.buckets { + if b.sameChainAs(ts) { + sbs.removeBucket(b) + return b + } + } + return nil +} + +func (sbs *syncBucketSet) Heaviest() *types.TipSet { + // TODO: should also consider factoring in number of peers represented by each bucket here + var bestTs *types.TipSet + for _, b := range sbs.buckets { + bhts := b.heaviestTipSet() + if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) { + bestTs = bhts + } + } + return bestTs +} + +type syncTargetBucket struct { + tips []*types.TipSet + count int +} + +func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { + var stb syncTargetBucket + for _, ts := range tipsets { + stb.add(ts) + } + return &stb +} + +func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { + for _, t := range stb.tips { + if ts.Equals(t) { + return true + } + if types.CidArrsEqual(ts.Cids(), t.Parents()) { + return true + } + if types.CidArrsEqual(ts.Parents(), t.Cids()) { + return true + } + } + return false +} + +func (stb *syncTargetBucket) add(ts *types.TipSet) { + stb.count++ + + for _, t := range stb.tips { + if t.Equals(ts) { + return + } + } + + stb.tips = append(stb.tips, ts) +} + +func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { + if stb == nil { + return nil + } + + var best *types.TipSet + for _, ts := range stb.tips { + if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) { + best = ts + } + } + return best +} + +func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { + var buckets syncBucketSet + + var peerHeads []*types.TipSet + for _, ts := range sm.peerHeads { + peerHeads = append(peerHeads, ts) + } + sort.Slice(peerHeads, func(i, j int) bool { + return peerHeads[i].Height() < peerHeads[j].Height() + }) + + for _, ts := range peerHeads { + buckets.Insert(ts) + } + + if len(buckets.buckets) > 1 { + log.Warning("caution, multiple distinct chains seen during head selections") + // TODO: we *could* refuse to sync here without user intervention. + // For now, just select the best cluster + } + + return buckets.Heaviest(), nil +} + +func (sm *SyncManager) syncScheduler() { + + for { + select { + case ts, ok := <-sm.incomingTipSets: + if !ok { + log.Info("shutting down sync scheduler") + return + } + + sm.scheduleIncoming(ts) + case res := <-sm.syncResults: + sm.scheduleProcessResult(res) + case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): + sm.scheduleWorkSent() + case <-sm.stop: + log.Info("sync scheduler shutting down") + return + } + } +} + +func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { + var relatedToActiveSync bool + for _, acts := range sm.activeSyncs { + if ts.Equals(acts) { + break + } + + if types.CidArrsEqual(ts.Parents(), acts.Cids()) { + // sync this next, after that sync process finishes + relatedToActiveSync = true + } + } + + // if this is related to an active sync process, immediately bucket it + // we don't want to start a parallel sync process that duplicates work + if relatedToActiveSync { + sm.activeSyncTips.Insert(ts) + return + } + + if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { + sm.nextSyncTarget.add(ts) + } else { + sm.syncQueue.Insert(ts) + + if sm.nextSyncTarget == nil { + sm.nextSyncTarget = sm.syncQueue.Pop() + sm.workerChan = sm.syncTargets + } + } +} + +func (sm *SyncManager) scheduleProcessResult(res *syncResult) { + delete(sm.activeSyncs, res.ts.Key()) + relbucket := sm.activeSyncTips.PopRelated(res.ts) + if relbucket != nil { + if res.success { + if sm.nextSyncTarget == nil { + sm.nextSyncTarget = relbucket + sm.workerChan = sm.syncTargets + } else { + sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) + } + } else { + // TODO: this is the case where we try to sync a chain, and + // fail, and we have more blocks on top of that chain that + // have come in since. The question is, should we try to + // sync these? or just drop them? + } + } +} + +func (sm *SyncManager) scheduleWorkSent() { + hts := sm.nextSyncTarget.heaviestTipSet() + sm.activeSyncs[hts.Key()] = hts + + if len(sm.syncQueue.buckets) > 0 { + sm.nextSyncTarget = sm.syncQueue.Pop() + } else { + sm.nextSyncTarget = nil + sm.workerChan = nil + } +} + +func (sm *SyncManager) syncWorker(id int) { + ss := &SyncerState{} + sm.syncStates[id] = ss + for { + select { + case ts, ok := <-sm.syncTargets: + if !ok { + log.Info("sync manager worker shutting down") + return + } + log.Info("sync worker go time!", ts.Height(), ts.Cids()) + + ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) + err := sm.doSync(ctx, ts) + if err != nil { + log.Errorf("sync error: %+v", err) + } + + sm.syncResults <- &syncResult{ + ts: ts, + success: err == nil, + } + } + } +} + +func (sm *SyncManager) syncedPeerCount() int { + var count int + for _, ts := range sm.peerHeads { + if ts.Height() > 0 { + count++ + } + } + return count +} + +func (sm *SyncManager) IsBootstrapped() bool { + sm.lk.Lock() + defer sm.lk.Unlock() + return sm.bootstrapped +} diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go new file mode 100644 index 000000000..1eb85ec2c --- /dev/null +++ b/chain/sync_manager_test.go @@ -0,0 +1,123 @@ +package chain + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" +) + +var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0)) + +type syncOp struct { + ts *types.TipSet + done func() +} + +func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) { + syncTargets := make(chan *syncOp) + sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error { + ch := make(chan struct{}) + syncTargets <- &syncOp{ + ts: ts, + done: func() { close(ch) }, + } + <-ch + return nil + }) + sm.bspThresh = thresh + + sm.Start() + defer sm.Stop() + t.Run(tname+fmt.Sprintf("-%d", thresh), func(t *testing.T) { + tf(t, sm, syncTargets) + }) +} + +func assertTsEqual(t *testing.T, actual, expected *types.TipSet) { + t.Helper() + if !actual.Equals(expected) { + t.Fatalf("got unexpected tipset %s (expected: %s)", actual.Cids(), expected.Cids()) + } +} + +func assertNoOp(t *testing.T, c chan *syncOp) { + t.Helper() + select { + case <-time.After(time.Millisecond * 20): + case <-c: + t.Fatal("shouldnt have gotten any sync operations yet") + } +} + +func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) { + t.Helper() + + select { + case <-time.After(time.Millisecond * 100): + t.Fatal("expected sync manager to try and sync to our target") + case op := <-c: + op.done() + if !op.ts.Equals(ts) { + t.Fatalf("somehow got wrong tipset from syncer (got %s, expected %s)", op.ts.Cids(), ts.Cids()) + } + } +} + +func TestSyncManager(t *testing.T) { + ctx := context.Background() + + a := mock.TipSet(mock.MkBlock(genTs, 1, 1)) + b := mock.TipSet(mock.MkBlock(a, 1, 2)) + c1 := mock.TipSet(mock.MkBlock(b, 1, 3)) + c2 := mock.TipSet(mock.MkBlock(b, 2, 4)) + d := mock.TipSet(mock.MkBlock(c1, 4, 5)) + + runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", c1) + assertGetSyncOp(t, stc, c1) + }) + + runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", c1) + assertNoOp(t, stc) + + sm.SetPeerHead(ctx, "peer2", c1) + assertGetSyncOp(t, stc, c1) + }) + + runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", b) + assertGetSyncOp(t, stc, b) + + sm.SetPeerHead(ctx, "peer2", c1) + assertGetSyncOp(t, stc, c1) + + sm.SetPeerHead(ctx, "peer2", c2) + assertGetSyncOp(t, stc, c2) + }) + + runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + sm.SetPeerHead(ctx, "peer1", a) + assertGetSyncOp(t, stc, a) + + sm.SetPeerHead(ctx, "peer2", b) + op := <-stc + + sm.SetPeerHead(ctx, "peer2", c1) + sm.SetPeerHead(ctx, "peer2", c2) + sm.SetPeerHead(ctx, "peer2", d) + + assertTsEqual(t, op.ts, b) + + // need a better way to 'wait until syncmgr is idle' + time.Sleep(time.Millisecond * 20) + + op.done() + + assertGetSyncOp(t, stc, d) + }) +} diff --git a/chain/syncstate.go b/chain/syncstate.go index 2a5ea6899..45aeba90c 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -32,12 +32,20 @@ type SyncerState struct { } func (ss *SyncerState) SetStage(v api.SyncStateStage) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Stage = v } func (ss *SyncerState) Init(base, target *types.TipSet) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Target = target @@ -47,6 +55,10 @@ func (ss *SyncerState) Init(base, target *types.TipSet) { } func (ss *SyncerState) SetHeight(h uint64) { + if ss == nil { + return + } + ss.lk.Lock() defer ss.lk.Unlock() ss.Height = h diff --git a/chain/types/mock/chain.go b/chain/types/mock/chain.go new file mode 100644 index 000000000..09b37370a --- /dev/null +++ b/chain/types/mock/chain.go @@ -0,0 +1,61 @@ +package mock + +import ( + "fmt" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +func Address(i uint64) address.Address { + a, err := address.NewIDAddress(i) + if err != nil { + panic(err) + } + return a +} + +func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader { + addr := Address(123561) + + c, err := cid.Decode("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i") + if err != nil { + panic(err) + } + + var pcids []cid.Cid + var height uint64 + weight := types.NewInt(weightInc) + if parents != nil { + pcids = parents.Cids() + height = parents.Height() + 1 + weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight) + } + + return &types.BlockHeader{ + Miner: addr, + ElectionProof: []byte("cats won the election"), + Tickets: []*types.Ticket{ + { + VRFProof: []byte(fmt.Sprintf("====%d=====", ticketNonce)), + }, + }, + Parents: pcids, + ParentMessageReceipts: c, + BLSAggregate: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")}, + ParentWeight: weight, + Messages: c, + Height: height, + ParentStateRoot: c, + BlockSig: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")}, + } +} + +func TipSet(blks ...*types.BlockHeader) *types.TipSet { + ts, err := types.NewTipSet(blks) + if err != nil { + panic(err) + } + return ts +} diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 13fb7e582..ca6941e5c 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -197,6 +197,10 @@ func (ts *TipSet) ParentState() cid.Cid { return ts.blks[0].ParentStateRoot } +func (ts *TipSet) ParentWeight() BigInt { + return ts.blks[0].ParentWeight +} + func (ts *TipSet) Contains(oc cid.Cid) bool { for _, c := range ts.cids { if c == oc { diff --git a/cli/sync.go b/cli/sync.go index da2dae8bd..b1b4efd0c 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -8,6 +8,7 @@ import ( "gopkg.in/urfave/cli.v2" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" ) @@ -31,24 +32,26 @@ var syncStatusCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - ss, err := api.SyncState(ctx) + state, err := api.SyncState(ctx) if err != nil { return err } - var base, target []cid.Cid - if ss.Base != nil { - base = ss.Base.Cids() - } - if ss.Target != nil { - target = ss.Target.Cids() - } - fmt.Println("sync status:") - fmt.Printf("Base:\t%s\n", base) - fmt.Printf("Target:\t%s\n", target) - fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage)) - fmt.Printf("Height: %d\n", ss.Height) + for i, ss := range state.ActiveSyncs { + fmt.Printf("worker %d:\n", i) + var base, target []cid.Cid + if ss.Base != nil { + base = ss.Base.Cids() + } + if ss.Target != nil { + target = ss.Target.Cids() + } + fmt.Printf("\tBase:\t%s\n", base) + fmt.Printf("\tTarget:\t%s\n", target) + fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage)) + fmt.Printf("\tHeight: %d\n", ss.Height) + } return nil }, } @@ -65,19 +68,38 @@ var syncWaitCmd = &cli.Command{ ctx := ReqContext(cctx) for { - ss, err := napi.SyncState(ctx) + state, err := napi.SyncState(ctx) if err != nil { return err } + head, err := napi.ChainHead(ctx) + if err != nil { + return err + } + + working := 0 + for i, ss := range state.ActiveSyncs { + switch ss.Stage { + case api.StageSyncComplete: + default: + working = i + case api.StageIdle: + // not complete, not actively working + } + } + + ss := state.ActiveSyncs[working] + var target []cid.Cid if ss.Target != nil { target = ss.Target.Cids() } - fmt.Printf("\r\x1b[2KTarget: %s\tState: %s\tHeight: %d", target, chain.SyncStageString(ss.Stage), ss.Height) - if ss.Stage == api.StageSyncComplete { - fmt.Println("\nDone") + fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) + + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { + fmt.Println("\nDone!") return nil } diff --git a/node/builder.go b/node/builder.go index ce647e822..02082a3e5 100644 --- a/node/builder.go +++ b/node/builder.go @@ -200,7 +200,7 @@ func Online() Option { Override(new(dtypes.ClientDAG), testing.MemoryClientDag), // Filecoin services - Override(new(*chain.Syncer), chain.NewSyncer), + Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), Override(new(*chain.MessagePool), modules.MessagePool), diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index a9d2d3aa5..b06d51da4 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -20,13 +20,19 @@ type SyncAPI struct { } func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { - ss := a.Syncer.State() - return &api.SyncState{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, - }, nil + states := a.Syncer.State() + + out := &api.SyncState{} + + for _, ss := range states { + out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{ + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + }) + } + return out, nil } func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error { diff --git a/node/modules/chain.go b/node/modules/chain.go index fa77a40b8..25f71c84a 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -12,11 +12,13 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" + peer "github.com/libp2p/go-libp2p-peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -119,3 +121,22 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error { return cs.SetGenesis(genesis) } + +func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) { + syncer, err := chain.NewSyncer(sm, bsync, self) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + syncer.Start() + return nil + }, + OnStop: func(_ context.Context) error { + syncer.Stop() + return nil + }, + }) + return syncer, nil +} diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index 32943c16f..c2131b480 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -6,13 +6,13 @@ import ( "net/http" "time" - "github.com/multiformats/go-multiaddr-net" + manet "github.com/multiformats/go-multiaddr-net" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/jsonrpc" @@ -50,15 +50,15 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(30 * time.Second): - state, err := napi.SyncState(ctx) + case <-time.After(3 * time.Second): + head, err := napi.ChainHead(ctx) if err != nil { return err } - log.Printf("Stage %s, Height %d", chain.SyncStageString(state.Stage), state.Height) + log.Printf("Height %d", head.Height()) - if state.Stage == api.StageSyncComplete { + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { return nil } }