Merge pull request #626 from filecoin-project/feat/chain-sync-manager
Implement chain sync manager
This commit is contained in:
commit
35659af84d
@ -253,7 +253,7 @@ type ReplayResults struct {
|
||||
Error string
|
||||
}
|
||||
|
||||
type SyncState struct {
|
||||
type ActiveSync struct {
|
||||
Base *types.TipSet
|
||||
Target *types.TipSet
|
||||
|
||||
@ -261,6 +261,10 @@ type SyncState struct {
|
||||
Height uint64
|
||||
}
|
||||
|
||||
type SyncState struct {
|
||||
ActiveSyncs []ActiveSync
|
||||
}
|
||||
|
||||
type SyncStateStage int
|
||||
|
||||
const (
|
||||
|
@ -77,6 +77,14 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
||||
|
||||
var oerr error
|
||||
for _, p := range peers {
|
||||
// TODO: doing this synchronously isnt great, but fetching in parallel
|
||||
// may not be a good idea either. think about this more
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, xerrors.Errorf("blocksync getblocks failed: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
res, err := bs.sendRequestToPeer(ctx, p, req)
|
||||
if err != nil {
|
||||
oerr = err
|
||||
|
@ -47,8 +47,6 @@ type Syncer struct {
|
||||
// The known Genesis tipset
|
||||
Genesis *types.TipSet
|
||||
|
||||
syncLock sync.Mutex
|
||||
|
||||
// TipSets known to be invalid
|
||||
bad *BadBlockCache
|
||||
|
||||
@ -57,12 +55,9 @@ type Syncer struct {
|
||||
|
||||
self peer.ID
|
||||
|
||||
syncState SyncerState
|
||||
syncLock sync.Mutex
|
||||
|
||||
// peer heads
|
||||
// Note: clear cache on disconnects
|
||||
peerHeads map[peer.ID]*types.TipSet
|
||||
peerHeadsLk sync.Mutex
|
||||
syncmgr *SyncManager
|
||||
}
|
||||
|
||||
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
|
||||
@ -76,18 +71,26 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Syncer{
|
||||
bad: NewBadBlockCache(),
|
||||
Genesis: gent,
|
||||
Bsync: bsync,
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
store: sm.ChainStore(),
|
||||
sm: sm,
|
||||
self: self,
|
||||
}, nil
|
||||
s := &Syncer{
|
||||
bad: NewBadBlockCache(),
|
||||
Genesis: gent,
|
||||
Bsync: bsync,
|
||||
store: sm.ChainStore(),
|
||||
sm: sm,
|
||||
self: self,
|
||||
}
|
||||
|
||||
s.syncmgr = NewSyncManager(s.Sync)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
const BootstrapPeerThreshold = 1
|
||||
func (syncer *Syncer) Start() {
|
||||
syncer.syncmgr.Start()
|
||||
}
|
||||
|
||||
func (syncer *Syncer) Stop() {
|
||||
syncer.syncmgr.Stop()
|
||||
}
|
||||
|
||||
// InformNewHead informs the syncer about a new potential tipset
|
||||
// This should be called when connecting to new peers, and additionally
|
||||
@ -124,9 +127,6 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
return
|
||||
}
|
||||
|
||||
syncer.peerHeadsLk.Lock()
|
||||
syncer.peerHeads[from] = fts.TipSet()
|
||||
syncer.peerHeadsLk.Unlock()
|
||||
syncer.Bsync.AddPeer(from)
|
||||
|
||||
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
|
||||
@ -136,11 +136,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
|
||||
log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err)
|
||||
}
|
||||
}()
|
||||
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
|
||||
}
|
||||
|
||||
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
||||
@ -389,6 +385,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
||||
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
||||
defer span.End()
|
||||
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
|
||||
@ -396,9 +393,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||
)
|
||||
}
|
||||
|
||||
syncer.syncLock.Lock()
|
||||
defer syncer.syncLock.Unlock()
|
||||
|
||||
if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) {
|
||||
return nil
|
||||
}
|
||||
@ -759,9 +753,20 @@ func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig types.Signatur
|
||||
return nil
|
||||
}
|
||||
|
||||
type syncStateKey struct{}
|
||||
|
||||
func extractSyncState(ctx context.Context) *SyncerState {
|
||||
v := ctx.Value(syncStateKey{})
|
||||
if v != nil {
|
||||
return v.(*SyncerState)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
||||
defer span.End()
|
||||
ss := extractSyncState(ctx)
|
||||
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute("fromHeight", int64(from.Height())),
|
||||
@ -784,7 +789,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
|
||||
// we want to sync all the blocks until the height above the block we have
|
||||
untilHeight := to.Height() + 1
|
||||
|
||||
syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height())
|
||||
ss.SetHeight(blockSet[len(blockSet)-1].Height())
|
||||
|
||||
var acceptedBlocks []cid.Cid
|
||||
|
||||
@ -852,7 +857,7 @@ loop:
|
||||
|
||||
acceptedBlocks = append(acceptedBlocks, at...)
|
||||
|
||||
syncer.syncState.SetHeight(blks[len(blks)-1].Height())
|
||||
ss.SetHeight(blks[len(blks)-1].Height())
|
||||
at = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
@ -915,7 +920,8 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
|
||||
}
|
||||
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
|
||||
syncer.syncState.SetHeight(0)
|
||||
ss := extractSyncState(ctx)
|
||||
ss.SetHeight(0)
|
||||
|
||||
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
|
||||
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
|
||||
@ -924,7 +930,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
syncer.syncState.SetHeight(fts.TipSet().Height())
|
||||
ss.SetHeight(fts.TipSet().Height())
|
||||
|
||||
return nil
|
||||
})
|
||||
@ -1019,8 +1025,9 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error {
|
||||
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
|
||||
ctx, span := trace.StartSpan(ctx, "collectChain")
|
||||
defer span.End()
|
||||
ss := extractSyncState(ctx)
|
||||
|
||||
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
|
||||
ss.Init(syncer.store.GetHeaviestTipSet(), ts)
|
||||
|
||||
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
|
||||
if err != nil {
|
||||
@ -1033,7 +1040,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
||||
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
|
||||
}
|
||||
|
||||
syncer.syncState.SetStage(api.StagePersistHeaders)
|
||||
ss.SetStage(api.StagePersistHeaders)
|
||||
|
||||
toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch)
|
||||
for _, ts := range headers {
|
||||
@ -1044,13 +1051,13 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
||||
}
|
||||
toPersist = nil
|
||||
|
||||
syncer.syncState.SetStage(api.StageMessages)
|
||||
ss.SetStage(api.StageMessages)
|
||||
|
||||
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
|
||||
return xerrors.Errorf("collectChain syncMessages: %w", err)
|
||||
}
|
||||
|
||||
syncer.syncState.SetStage(api.StageSyncComplete)
|
||||
ss.SetStage(api.StageSyncComplete)
|
||||
log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))
|
||||
|
||||
return nil
|
||||
@ -1069,6 +1076,10 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) State() SyncerState {
|
||||
return syncer.syncState.Snapshot()
|
||||
func (syncer *Syncer) State() []SyncerState {
|
||||
var out []SyncerState
|
||||
for _, ss := range syncer.syncmgr.syncStates {
|
||||
out = append(out, ss.Snapshot())
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
369
chain/sync_manager.go
Normal file
369
chain/sync_manager.go
Normal file
@ -0,0 +1,369 @@
|
||||
package chain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
|
||||
const BootstrapPeerThreshold = 2
|
||||
|
||||
type SyncFunc func(context.Context, *types.TipSet) error
|
||||
|
||||
type SyncManager struct {
|
||||
lk sync.Mutex
|
||||
peerHeads map[peer.ID]*types.TipSet
|
||||
bootstrapped bool
|
||||
|
||||
bspThresh int
|
||||
|
||||
incomingTipSets chan *types.TipSet
|
||||
syncTargets chan *types.TipSet
|
||||
syncResults chan *syncResult
|
||||
|
||||
syncStates []*SyncerState
|
||||
|
||||
doSync func(context.Context, *types.TipSet) error
|
||||
|
||||
stop chan struct{}
|
||||
|
||||
// Sync Scheduler fields
|
||||
activeSyncs map[types.TipSetKey]*types.TipSet
|
||||
syncQueue syncBucketSet
|
||||
activeSyncTips syncBucketSet
|
||||
nextSyncTarget *syncTargetBucket
|
||||
workerChan chan *types.TipSet
|
||||
}
|
||||
|
||||
type syncResult struct {
|
||||
ts *types.TipSet
|
||||
success bool
|
||||
}
|
||||
|
||||
const syncWorkerCount = 3
|
||||
|
||||
func NewSyncManager(sync SyncFunc) *SyncManager {
|
||||
return &SyncManager{
|
||||
bspThresh: 1,
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
syncTargets: make(chan *types.TipSet),
|
||||
syncResults: make(chan *syncResult),
|
||||
syncStates: make([]*SyncerState, syncWorkerCount),
|
||||
incomingTipSets: make(chan *types.TipSet),
|
||||
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
|
||||
doSync: sync,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) Start() {
|
||||
go sm.syncScheduler()
|
||||
for i := 0; i < syncWorkerCount; i++ {
|
||||
go sm.syncWorker(i)
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) Stop() {
|
||||
close(sm.stop)
|
||||
}
|
||||
|
||||
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
|
||||
log.Info("set peer head!", ts.Height(), ts.Cids())
|
||||
sm.lk.Lock()
|
||||
defer sm.lk.Unlock()
|
||||
sm.peerHeads[p] = ts
|
||||
|
||||
if !sm.bootstrapped {
|
||||
spc := sm.syncedPeerCount()
|
||||
if spc >= sm.bspThresh {
|
||||
// Its go time!
|
||||
target, err := sm.selectSyncTarget()
|
||||
if err != nil {
|
||||
log.Error("failed to select sync target: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
sm.incomingTipSets <- target
|
||||
// TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes
|
||||
sm.bootstrapped = true
|
||||
}
|
||||
log.Infof("sync bootstrap has %d peers", spc)
|
||||
return
|
||||
}
|
||||
|
||||
sm.incomingTipSets <- ts
|
||||
}
|
||||
|
||||
type syncBucketSet struct {
|
||||
buckets []*syncTargetBucket
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Insert(ts *types.TipSet) {
|
||||
for _, b := range sbs.buckets {
|
||||
if b.sameChainAs(ts) {
|
||||
b.add(ts)
|
||||
return
|
||||
}
|
||||
}
|
||||
sbs.buckets = append(sbs.buckets, &syncTargetBucket{
|
||||
tips: []*types.TipSet{ts},
|
||||
count: 1,
|
||||
})
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Pop() *syncTargetBucket {
|
||||
var bestBuck *syncTargetBucket
|
||||
var bestTs *types.TipSet
|
||||
for _, b := range sbs.buckets {
|
||||
hts := b.heaviestTipSet()
|
||||
if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) {
|
||||
bestBuck = b
|
||||
bestTs = hts
|
||||
}
|
||||
}
|
||||
|
||||
sbs.removeBucket(bestBuck)
|
||||
|
||||
return bestBuck
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
|
||||
nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1)
|
||||
for _, b := range sbs.buckets {
|
||||
if b != toremove {
|
||||
nbuckets = append(nbuckets, b)
|
||||
}
|
||||
}
|
||||
sbs.buckets = nbuckets
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
|
||||
for _, b := range sbs.buckets {
|
||||
if b.sameChainAs(ts) {
|
||||
sbs.removeBucket(b)
|
||||
return b
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
|
||||
// TODO: should also consider factoring in number of peers represented by each bucket here
|
||||
var bestTs *types.TipSet
|
||||
for _, b := range sbs.buckets {
|
||||
bhts := b.heaviestTipSet()
|
||||
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
|
||||
bestTs = bhts
|
||||
}
|
||||
}
|
||||
return bestTs
|
||||
}
|
||||
|
||||
type syncTargetBucket struct {
|
||||
tips []*types.TipSet
|
||||
count int
|
||||
}
|
||||
|
||||
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
||||
var stb syncTargetBucket
|
||||
for _, ts := range tipsets {
|
||||
stb.add(ts)
|
||||
}
|
||||
return &stb
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||
for _, t := range stb.tips {
|
||||
if ts.Equals(t) {
|
||||
return true
|
||||
}
|
||||
if types.CidArrsEqual(ts.Cids(), t.Parents()) {
|
||||
return true
|
||||
}
|
||||
if types.CidArrsEqual(ts.Parents(), t.Cids()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
||||
stb.count++
|
||||
|
||||
for _, t := range stb.tips {
|
||||
if t.Equals(ts) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
stb.tips = append(stb.tips, ts)
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
|
||||
if stb == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var best *types.TipSet
|
||||
for _, ts := range stb.tips {
|
||||
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
|
||||
best = ts
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
|
||||
var buckets syncBucketSet
|
||||
|
||||
var peerHeads []*types.TipSet
|
||||
for _, ts := range sm.peerHeads {
|
||||
peerHeads = append(peerHeads, ts)
|
||||
}
|
||||
sort.Slice(peerHeads, func(i, j int) bool {
|
||||
return peerHeads[i].Height() < peerHeads[j].Height()
|
||||
})
|
||||
|
||||
for _, ts := range peerHeads {
|
||||
buckets.Insert(ts)
|
||||
}
|
||||
|
||||
if len(buckets.buckets) > 1 {
|
||||
log.Warning("caution, multiple distinct chains seen during head selections")
|
||||
// TODO: we *could* refuse to sync here without user intervention.
|
||||
// For now, just select the best cluster
|
||||
}
|
||||
|
||||
return buckets.Heaviest(), nil
|
||||
}
|
||||
|
||||
func (sm *SyncManager) syncScheduler() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case ts, ok := <-sm.incomingTipSets:
|
||||
if !ok {
|
||||
log.Info("shutting down sync scheduler")
|
||||
return
|
||||
}
|
||||
|
||||
sm.scheduleIncoming(ts)
|
||||
case res := <-sm.syncResults:
|
||||
sm.scheduleProcessResult(res)
|
||||
case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet():
|
||||
sm.scheduleWorkSent()
|
||||
case <-sm.stop:
|
||||
log.Info("sync scheduler shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
|
||||
var relatedToActiveSync bool
|
||||
for _, acts := range sm.activeSyncs {
|
||||
if ts.Equals(acts) {
|
||||
break
|
||||
}
|
||||
|
||||
if types.CidArrsEqual(ts.Parents(), acts.Cids()) {
|
||||
// sync this next, after that sync process finishes
|
||||
relatedToActiveSync = true
|
||||
}
|
||||
}
|
||||
|
||||
// if this is related to an active sync process, immediately bucket it
|
||||
// we don't want to start a parallel sync process that duplicates work
|
||||
if relatedToActiveSync {
|
||||
sm.activeSyncTips.Insert(ts)
|
||||
return
|
||||
}
|
||||
|
||||
if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) {
|
||||
sm.nextSyncTarget.add(ts)
|
||||
} else {
|
||||
sm.syncQueue.Insert(ts)
|
||||
|
||||
if sm.nextSyncTarget == nil {
|
||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
||||
sm.workerChan = sm.syncTargets
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
|
||||
delete(sm.activeSyncs, res.ts.Key())
|
||||
relbucket := sm.activeSyncTips.PopRelated(res.ts)
|
||||
if relbucket != nil {
|
||||
if res.success {
|
||||
if sm.nextSyncTarget == nil {
|
||||
sm.nextSyncTarget = relbucket
|
||||
sm.workerChan = sm.syncTargets
|
||||
} else {
|
||||
sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket)
|
||||
}
|
||||
} else {
|
||||
// TODO: this is the case where we try to sync a chain, and
|
||||
// fail, and we have more blocks on top of that chain that
|
||||
// have come in since. The question is, should we try to
|
||||
// sync these? or just drop them?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) scheduleWorkSent() {
|
||||
hts := sm.nextSyncTarget.heaviestTipSet()
|
||||
sm.activeSyncs[hts.Key()] = hts
|
||||
|
||||
if len(sm.syncQueue.buckets) > 0 {
|
||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
||||
} else {
|
||||
sm.nextSyncTarget = nil
|
||||
sm.workerChan = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) syncWorker(id int) {
|
||||
ss := &SyncerState{}
|
||||
sm.syncStates[id] = ss
|
||||
for {
|
||||
select {
|
||||
case ts, ok := <-sm.syncTargets:
|
||||
if !ok {
|
||||
log.Info("sync manager worker shutting down")
|
||||
return
|
||||
}
|
||||
log.Info("sync worker go time!", ts.Height(), ts.Cids())
|
||||
|
||||
ctx := context.WithValue(context.TODO(), syncStateKey{}, ss)
|
||||
err := sm.doSync(ctx, ts)
|
||||
if err != nil {
|
||||
log.Errorf("sync error: %+v", err)
|
||||
}
|
||||
|
||||
sm.syncResults <- &syncResult{
|
||||
ts: ts,
|
||||
success: err == nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) syncedPeerCount() int {
|
||||
var count int
|
||||
for _, ts := range sm.peerHeads {
|
||||
if ts.Height() > 0 {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (sm *SyncManager) IsBootstrapped() bool {
|
||||
sm.lk.Lock()
|
||||
defer sm.lk.Unlock()
|
||||
return sm.bootstrapped
|
||||
}
|
123
chain/sync_manager_test.go
Normal file
123
chain/sync_manager_test.go
Normal file
@ -0,0 +1,123 @@
|
||||
package chain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
)
|
||||
|
||||
var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0))
|
||||
|
||||
type syncOp struct {
|
||||
ts *types.TipSet
|
||||
done func()
|
||||
}
|
||||
|
||||
func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) {
|
||||
syncTargets := make(chan *syncOp)
|
||||
sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error {
|
||||
ch := make(chan struct{})
|
||||
syncTargets <- &syncOp{
|
||||
ts: ts,
|
||||
done: func() { close(ch) },
|
||||
}
|
||||
<-ch
|
||||
return nil
|
||||
})
|
||||
sm.bspThresh = thresh
|
||||
|
||||
sm.Start()
|
||||
defer sm.Stop()
|
||||
t.Run(tname+fmt.Sprintf("-%d", thresh), func(t *testing.T) {
|
||||
tf(t, sm, syncTargets)
|
||||
})
|
||||
}
|
||||
|
||||
func assertTsEqual(t *testing.T, actual, expected *types.TipSet) {
|
||||
t.Helper()
|
||||
if !actual.Equals(expected) {
|
||||
t.Fatalf("got unexpected tipset %s (expected: %s)", actual.Cids(), expected.Cids())
|
||||
}
|
||||
}
|
||||
|
||||
func assertNoOp(t *testing.T, c chan *syncOp) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-time.After(time.Millisecond * 20):
|
||||
case <-c:
|
||||
t.Fatal("shouldnt have gotten any sync operations yet")
|
||||
}
|
||||
}
|
||||
|
||||
func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
t.Fatal("expected sync manager to try and sync to our target")
|
||||
case op := <-c:
|
||||
op.done()
|
||||
if !op.ts.Equals(ts) {
|
||||
t.Fatalf("somehow got wrong tipset from syncer (got %s, expected %s)", op.ts.Cids(), ts.Cids())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncManager(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
a := mock.TipSet(mock.MkBlock(genTs, 1, 1))
|
||||
b := mock.TipSet(mock.MkBlock(a, 1, 2))
|
||||
c1 := mock.TipSet(mock.MkBlock(b, 1, 3))
|
||||
c2 := mock.TipSet(mock.MkBlock(b, 2, 4))
|
||||
d := mock.TipSet(mock.MkBlock(c1, 4, 5))
|
||||
|
||||
runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
sm.SetPeerHead(ctx, "peer1", c1)
|
||||
assertGetSyncOp(t, stc, c1)
|
||||
})
|
||||
|
||||
runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
sm.SetPeerHead(ctx, "peer1", c1)
|
||||
assertNoOp(t, stc)
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c1)
|
||||
assertGetSyncOp(t, stc, c1)
|
||||
})
|
||||
|
||||
runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
sm.SetPeerHead(ctx, "peer1", b)
|
||||
assertGetSyncOp(t, stc, b)
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c1)
|
||||
assertGetSyncOp(t, stc, c1)
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c2)
|
||||
assertGetSyncOp(t, stc, c2)
|
||||
})
|
||||
|
||||
runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
sm.SetPeerHead(ctx, "peer1", a)
|
||||
assertGetSyncOp(t, stc, a)
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", b)
|
||||
op := <-stc
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c1)
|
||||
sm.SetPeerHead(ctx, "peer2", c2)
|
||||
sm.SetPeerHead(ctx, "peer2", d)
|
||||
|
||||
assertTsEqual(t, op.ts, b)
|
||||
|
||||
// need a better way to 'wait until syncmgr is idle'
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
|
||||
op.done()
|
||||
|
||||
assertGetSyncOp(t, stc, d)
|
||||
})
|
||||
}
|
@ -32,12 +32,20 @@ type SyncerState struct {
|
||||
}
|
||||
|
||||
func (ss *SyncerState) SetStage(v api.SyncStateStage) {
|
||||
if ss == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
ss.Stage = v
|
||||
}
|
||||
|
||||
func (ss *SyncerState) Init(base, target *types.TipSet) {
|
||||
if ss == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
ss.Target = target
|
||||
@ -47,6 +55,10 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
|
||||
}
|
||||
|
||||
func (ss *SyncerState) SetHeight(h uint64) {
|
||||
if ss == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
ss.Height = h
|
||||
|
61
chain/types/mock/chain.go
Normal file
61
chain/types/mock/chain.go
Normal file
@ -0,0 +1,61 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
func Address(i uint64) address.Address {
|
||||
a, err := address.NewIDAddress(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types.BlockHeader {
|
||||
addr := Address(123561)
|
||||
|
||||
c, err := cid.Decode("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var pcids []cid.Cid
|
||||
var height uint64
|
||||
weight := types.NewInt(weightInc)
|
||||
if parents != nil {
|
||||
pcids = parents.Cids()
|
||||
height = parents.Height() + 1
|
||||
weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight)
|
||||
}
|
||||
|
||||
return &types.BlockHeader{
|
||||
Miner: addr,
|
||||
ElectionProof: []byte("cats won the election"),
|
||||
Tickets: []*types.Ticket{
|
||||
{
|
||||
VRFProof: []byte(fmt.Sprintf("====%d=====", ticketNonce)),
|
||||
},
|
||||
},
|
||||
Parents: pcids,
|
||||
ParentMessageReceipts: c,
|
||||
BLSAggregate: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")},
|
||||
ParentWeight: weight,
|
||||
Messages: c,
|
||||
Height: height,
|
||||
ParentStateRoot: c,
|
||||
BlockSig: types.Signature{Type: types.KTBLS, Data: []byte("boo! im a signature")},
|
||||
}
|
||||
}
|
||||
|
||||
func TipSet(blks ...*types.BlockHeader) *types.TipSet {
|
||||
ts, err := types.NewTipSet(blks)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ts
|
||||
}
|
@ -197,6 +197,10 @@ func (ts *TipSet) ParentState() cid.Cid {
|
||||
return ts.blks[0].ParentStateRoot
|
||||
}
|
||||
|
||||
func (ts *TipSet) ParentWeight() BigInt {
|
||||
return ts.blks[0].ParentWeight
|
||||
}
|
||||
|
||||
func (ts *TipSet) Contains(oc cid.Cid) bool {
|
||||
for _, c := range ts.cids {
|
||||
if c == oc {
|
||||
|
56
cli/sync.go
56
cli/sync.go
@ -8,6 +8,7 @@ import (
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
)
|
||||
|
||||
@ -31,24 +32,26 @@ var syncStatusCmd = &cli.Command{
|
||||
defer closer()
|
||||
ctx := ReqContext(cctx)
|
||||
|
||||
ss, err := api.SyncState(ctx)
|
||||
state, err := api.SyncState(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var base, target []cid.Cid
|
||||
if ss.Base != nil {
|
||||
base = ss.Base.Cids()
|
||||
}
|
||||
if ss.Target != nil {
|
||||
target = ss.Target.Cids()
|
||||
}
|
||||
|
||||
fmt.Println("sync status:")
|
||||
fmt.Printf("Base:\t%s\n", base)
|
||||
fmt.Printf("Target:\t%s\n", target)
|
||||
fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage))
|
||||
fmt.Printf("Height: %d\n", ss.Height)
|
||||
for i, ss := range state.ActiveSyncs {
|
||||
fmt.Printf("worker %d:\n", i)
|
||||
var base, target []cid.Cid
|
||||
if ss.Base != nil {
|
||||
base = ss.Base.Cids()
|
||||
}
|
||||
if ss.Target != nil {
|
||||
target = ss.Target.Cids()
|
||||
}
|
||||
fmt.Printf("\tBase:\t%s\n", base)
|
||||
fmt.Printf("\tTarget:\t%s\n", target)
|
||||
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
|
||||
fmt.Printf("\tHeight: %d\n", ss.Height)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
@ -65,19 +68,38 @@ var syncWaitCmd = &cli.Command{
|
||||
ctx := ReqContext(cctx)
|
||||
|
||||
for {
|
||||
ss, err := napi.SyncState(ctx)
|
||||
state, err := napi.SyncState(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
head, err := napi.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
working := 0
|
||||
for i, ss := range state.ActiveSyncs {
|
||||
switch ss.Stage {
|
||||
case api.StageSyncComplete:
|
||||
default:
|
||||
working = i
|
||||
case api.StageIdle:
|
||||
// not complete, not actively working
|
||||
}
|
||||
}
|
||||
|
||||
ss := state.ActiveSyncs[working]
|
||||
|
||||
var target []cid.Cid
|
||||
if ss.Target != nil {
|
||||
target = ss.Target.Cids()
|
||||
}
|
||||
|
||||
fmt.Printf("\r\x1b[2KTarget: %s\tState: %s\tHeight: %d", target, chain.SyncStageString(ss.Stage), ss.Height)
|
||||
if ss.Stage == api.StageSyncComplete {
|
||||
fmt.Println("\nDone")
|
||||
fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height)
|
||||
|
||||
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
|
||||
fmt.Println("\nDone!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -200,7 +200,7 @@ func Online() Option {
|
||||
Override(new(dtypes.ClientDAG), testing.MemoryClientDag),
|
||||
|
||||
// Filecoin services
|
||||
Override(new(*chain.Syncer), chain.NewSyncer),
|
||||
Override(new(*chain.Syncer), modules.NewSyncer),
|
||||
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
|
||||
Override(new(*chain.MessagePool), modules.MessagePool),
|
||||
|
||||
|
@ -20,13 +20,19 @@ type SyncAPI struct {
|
||||
}
|
||||
|
||||
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
||||
ss := a.Syncer.State()
|
||||
return &api.SyncState{
|
||||
Base: ss.Base,
|
||||
Target: ss.Target,
|
||||
Stage: ss.Stage,
|
||||
Height: ss.Height,
|
||||
}, nil
|
||||
states := a.Syncer.State()
|
||||
|
||||
out := &api.SyncState{}
|
||||
|
||||
for _, ss := range states {
|
||||
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
|
||||
Base: ss.Base,
|
||||
Target: ss.Target,
|
||||
Stage: ss.Stage,
|
||||
Height: ss.Height,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
||||
|
@ -12,11 +12,13 @@ import (
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -119,3 +121,22 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error {
|
||||
|
||||
return cs.SetGenesis(genesis)
|
||||
}
|
||||
|
||||
func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) {
|
||||
syncer, err := chain.NewSyncer(sm, bsync, self)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
syncer.Start()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
syncer.Stop()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return syncer, nil
|
||||
}
|
||||
|
@ -6,13 +6,13 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/multiformats/go-multiaddr-net"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
@ -50,15 +50,15 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(30 * time.Second):
|
||||
state, err := napi.SyncState(ctx)
|
||||
case <-time.After(3 * time.Second):
|
||||
head, err := napi.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Stage %s, Height %d", chain.SyncStageString(state.Stage), state.Height)
|
||||
log.Printf("Height %d", head.Height())
|
||||
|
||||
if state.Stage == api.StageSyncComplete {
|
||||
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user