Merge pull request #4599 from filecoin-project/feat/sync-manager-redux
rewrite sync manager
This commit is contained in:
commit
5a34e5b2bf
@ -789,8 +789,9 @@ type IpldObject struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ActiveSync struct {
|
type ActiveSync struct {
|
||||||
Base *types.TipSet
|
WorkerID uint64
|
||||||
Target *types.TipSet
|
Base *types.TipSet
|
||||||
|
Target *types.TipSet
|
||||||
|
|
||||||
Stage SyncStateStage
|
Stage SyncStateStage
|
||||||
Height abi.ChainEpoch
|
Height abi.ChainEpoch
|
||||||
|
@ -361,6 +361,8 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
|
|||||||
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
||||||
// some other heuristic.
|
// some other heuristic.
|
||||||
return cs.takeHeaviestTipSet(ctx, ts)
|
return cs.takeHeaviestTipSet(ctx, ts)
|
||||||
|
} else if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) {
|
||||||
|
log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -4,30 +4,43 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
const BootstrapPeerThreshold = 2
|
var (
|
||||||
|
BootstrapPeerThreshold = 4
|
||||||
|
|
||||||
var coalesceForksParents = false
|
RecentSyncBufferSize = 10
|
||||||
|
MaxSyncWorkers = 5
|
||||||
|
SyncWorkerHistory = 3
|
||||||
|
|
||||||
|
InitialSyncTimeThreshold = 15 * time.Minute
|
||||||
|
|
||||||
|
coalesceTipsets = false
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" {
|
coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes"
|
||||||
coalesceForksParents = true
|
|
||||||
|
if bootstrapPeerThreshold := os.Getenv("LOTUS_SYNC_BOOTSTRAP_PEERS"); bootstrapPeerThreshold != "" {
|
||||||
|
threshold, err := strconv.Atoi(bootstrapPeerThreshold)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to parse 'LOTUS_SYNC_BOOTSTRAP_PEERS' env var: %s", err)
|
||||||
|
} else {
|
||||||
|
BootstrapPeerThreshold = threshold
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
BSStateInit = 0
|
|
||||||
BSStateSelected = 1
|
|
||||||
BSStateScheduled = 2
|
|
||||||
BSStateComplete = 3
|
|
||||||
)
|
|
||||||
|
|
||||||
type SyncFunc func(context.Context, *types.TipSet) error
|
type SyncFunc func(context.Context, *types.TipSet) error
|
||||||
|
|
||||||
// SyncManager manages the chain synchronization process, both at bootstrap time
|
// SyncManager manages the chain synchronization process, both at bootstrap time
|
||||||
@ -52,108 +65,467 @@ type SyncManager interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type syncManager struct {
|
type syncManager struct {
|
||||||
lk sync.Mutex
|
ctx context.Context
|
||||||
peerHeads map[peer.ID]*types.TipSet
|
cancel func()
|
||||||
|
|
||||||
bssLk sync.Mutex
|
workq chan peerHead
|
||||||
bootstrapState int
|
statusq chan workerStatus
|
||||||
|
|
||||||
bspThresh int
|
nextWorker uint64
|
||||||
|
pend syncBucketSet
|
||||||
|
deferred syncBucketSet
|
||||||
|
heads map[peer.ID]*types.TipSet
|
||||||
|
recent *syncBuffer
|
||||||
|
|
||||||
incomingTipSets chan *types.TipSet
|
initialSyncDone bool
|
||||||
syncTargets chan *types.TipSet
|
|
||||||
syncResults chan *syncResult
|
|
||||||
|
|
||||||
syncStates []*SyncerState
|
mx sync.Mutex
|
||||||
|
state map[uint64]*workerState
|
||||||
|
|
||||||
|
history []*workerState
|
||||||
|
historyI int
|
||||||
|
|
||||||
// Normally this handler is set to `(*Syncer).Sync()`.
|
|
||||||
doSync func(context.Context, *types.TipSet) error
|
doSync func(context.Context, *types.TipSet) error
|
||||||
|
|
||||||
stop chan struct{}
|
|
||||||
|
|
||||||
// Sync Scheduler fields
|
|
||||||
activeSyncs map[types.TipSetKey]*types.TipSet
|
|
||||||
syncQueue syncBucketSet
|
|
||||||
activeSyncTips syncBucketSet
|
|
||||||
nextSyncTarget *syncTargetBucket
|
|
||||||
workerChan chan *types.TipSet
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ SyncManager = (*syncManager)(nil)
|
var _ SyncManager = (*syncManager)(nil)
|
||||||
|
|
||||||
type syncResult struct {
|
type peerHead struct {
|
||||||
ts *types.TipSet
|
p peer.ID
|
||||||
success bool
|
ts *types.TipSet
|
||||||
}
|
}
|
||||||
|
|
||||||
const syncWorkerCount = 3
|
type workerState struct {
|
||||||
|
id uint64
|
||||||
|
ts *types.TipSet
|
||||||
|
ss *SyncerState
|
||||||
|
dt time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type workerStatus struct {
|
||||||
|
id uint64
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync manager interface
|
||||||
func NewSyncManager(sync SyncFunc) SyncManager {
|
func NewSyncManager(sync SyncFunc) SyncManager {
|
||||||
sm := &syncManager{
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
bspThresh: 1,
|
return &syncManager{
|
||||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
ctx: ctx,
|
||||||
syncTargets: make(chan *types.TipSet),
|
cancel: cancel,
|
||||||
syncResults: make(chan *syncResult),
|
|
||||||
syncStates: make([]*SyncerState, syncWorkerCount),
|
workq: make(chan peerHead),
|
||||||
incomingTipSets: make(chan *types.TipSet),
|
statusq: make(chan workerStatus),
|
||||||
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
|
|
||||||
doSync: sync,
|
heads: make(map[peer.ID]*types.TipSet),
|
||||||
stop: make(chan struct{}),
|
state: make(map[uint64]*workerState),
|
||||||
|
recent: newSyncBuffer(RecentSyncBufferSize),
|
||||||
|
history: make([]*workerState, SyncWorkerHistory),
|
||||||
|
|
||||||
|
doSync: sync,
|
||||||
}
|
}
|
||||||
for i := range sm.syncStates {
|
|
||||||
sm.syncStates[i] = new(SyncerState)
|
|
||||||
}
|
|
||||||
return sm
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) Start() {
|
func (sm *syncManager) Start() {
|
||||||
go sm.syncScheduler()
|
go sm.scheduler()
|
||||||
for i := 0; i < syncWorkerCount; i++ {
|
|
||||||
go sm.syncWorker(i)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) Stop() {
|
func (sm *syncManager) Stop() {
|
||||||
close(sm.stop)
|
select {
|
||||||
|
case <-sm.ctx.Done():
|
||||||
|
default:
|
||||||
|
sm.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
|
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
|
||||||
sm.lk.Lock()
|
select {
|
||||||
defer sm.lk.Unlock()
|
case sm.workq <- peerHead{p: p, ts: ts}:
|
||||||
sm.peerHeads[p] = ts
|
case <-sm.ctx.Done():
|
||||||
|
case <-ctx.Done():
|
||||||
if sm.getBootstrapState() == BSStateInit {
|
|
||||||
spc := sm.syncedPeerCount()
|
|
||||||
if spc >= sm.bspThresh {
|
|
||||||
// Its go time!
|
|
||||||
target, err := sm.selectSyncTarget()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("failed to select sync target: ", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sm.setBootstrapState(BSStateSelected)
|
|
||||||
|
|
||||||
sm.incomingTipSets <- target
|
|
||||||
}
|
|
||||||
log.Infof("sync bootstrap has %d peers", spc)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.incomingTipSets <- ts
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) State() []SyncerStateSnapshot {
|
func (sm *syncManager) State() []SyncerStateSnapshot {
|
||||||
ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates))
|
sm.mx.Lock()
|
||||||
for _, s := range sm.syncStates {
|
workerStates := make([]*workerState, 0, len(sm.state)+len(sm.history))
|
||||||
ret = append(ret, s.Snapshot())
|
for _, ws := range sm.state {
|
||||||
|
workerStates = append(workerStates, ws)
|
||||||
}
|
}
|
||||||
return ret
|
for _, ws := range sm.history {
|
||||||
|
if ws != nil {
|
||||||
|
workerStates = append(workerStates, ws)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sm.mx.Unlock()
|
||||||
|
|
||||||
|
sort.Slice(workerStates, func(i, j int) bool {
|
||||||
|
return workerStates[i].id < workerStates[j].id
|
||||||
|
})
|
||||||
|
|
||||||
|
result := make([]SyncerStateSnapshot, 0, len(workerStates))
|
||||||
|
for _, ws := range workerStates {
|
||||||
|
result = append(result, ws.ss.Snapshot())
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sync manager internals
|
||||||
|
func (sm *syncManager) scheduler() {
|
||||||
|
ticker := time.NewTicker(time.Minute)
|
||||||
|
tickerC := ticker.C
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case head := <-sm.workq:
|
||||||
|
sm.handlePeerHead(head)
|
||||||
|
case status := <-sm.statusq:
|
||||||
|
sm.handleWorkerStatus(status)
|
||||||
|
case <-tickerC:
|
||||||
|
if sm.initialSyncDone {
|
||||||
|
ticker.Stop()
|
||||||
|
tickerC = nil
|
||||||
|
sm.handleInitialSyncDone()
|
||||||
|
}
|
||||||
|
case <-sm.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *syncManager) handlePeerHead(head peerHead) {
|
||||||
|
log.Infof("new peer head: %s %s", head.p, head.ts)
|
||||||
|
|
||||||
|
// have we started syncing yet?
|
||||||
|
if sm.nextWorker == 0 {
|
||||||
|
// track the peer head until we start syncing
|
||||||
|
sm.heads[head.p] = head.ts
|
||||||
|
|
||||||
|
// not yet; do we have enough peers?
|
||||||
|
if len(sm.heads) < BootstrapPeerThreshold {
|
||||||
|
// not enough peers; track it and wait
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are ready to start syncing; select the sync target and spawn a worker
|
||||||
|
target, err := sm.selectInitialSyncTarget()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to select initial sync target: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("selected initial sync target: %s", target)
|
||||||
|
sm.spawnWorker(target)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have started syncing, add peer head to the queue if applicable and maybe spawn a worker
|
||||||
|
// if there is work to do (possibly in a fork)
|
||||||
|
target, work, err := sm.addSyncTarget(head.ts)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to add sync target: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if work {
|
||||||
|
log.Infof("selected sync target: %s", target)
|
||||||
|
sm.spawnWorker(target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *syncManager) handleWorkerStatus(status workerStatus) {
|
||||||
|
log.Debugf("worker %d done; status error: %s", status.id, status.err)
|
||||||
|
|
||||||
|
sm.mx.Lock()
|
||||||
|
ws := sm.state[status.id]
|
||||||
|
delete(sm.state, status.id)
|
||||||
|
|
||||||
|
// we track the last few workers for debug purposes
|
||||||
|
sm.history[sm.historyI] = ws
|
||||||
|
sm.historyI++
|
||||||
|
sm.historyI %= len(sm.history)
|
||||||
|
sm.mx.Unlock()
|
||||||
|
|
||||||
|
if status.err != nil {
|
||||||
|
// we failed to sync this target -- log it and try to work on an extended chain
|
||||||
|
// if there is nothing related to be worked on, we stop working on this chain.
|
||||||
|
log.Errorf("error during sync in %s: %s", ws.ts, status.err)
|
||||||
|
} else {
|
||||||
|
// add to the recently synced buffer
|
||||||
|
sm.recent.Push(ws.ts)
|
||||||
|
// if we are still in initial sync and this was fast enough, mark the end of the initial sync
|
||||||
|
if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold {
|
||||||
|
sm.initialSyncDone = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are done with this target, select the next sync target and spawn a worker if there is work
|
||||||
|
// to do, because of an extension of this chain.
|
||||||
|
target, work, err := sm.selectSyncTarget(ws.ts)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to select sync target: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if work {
|
||||||
|
log.Infof("selected sync target: %s", target)
|
||||||
|
sm.spawnWorker(target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *syncManager) handleInitialSyncDone() {
|
||||||
|
// we have just finished the initial sync; spawn some additional workers in deferred syncs
|
||||||
|
// as needed (and up to MaxSyncWorkers) to ramp up chain sync
|
||||||
|
for len(sm.state) < MaxSyncWorkers {
|
||||||
|
target, work, err := sm.selectDeferredSyncTarget()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error selecting deferred sync target: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !work {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("selected deferred sync target: %s", target)
|
||||||
|
sm.spawnWorker(target)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *syncManager) spawnWorker(target *types.TipSet) {
|
||||||
|
id := sm.nextWorker
|
||||||
|
sm.nextWorker++
|
||||||
|
ws := &workerState{
|
||||||
|
id: id,
|
||||||
|
ts: target,
|
||||||
|
ss: new(SyncerState),
|
||||||
|
}
|
||||||
|
ws.ss.data.WorkerID = id
|
||||||
|
|
||||||
|
sm.mx.Lock()
|
||||||
|
sm.state[id] = ws
|
||||||
|
sm.mx.Unlock()
|
||||||
|
|
||||||
|
go sm.worker(ws)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *syncManager) worker(ws *workerState) {
|
||||||
|
log.Infof("worker %d syncing in %s", ws.id, ws.ts)
|
||||||
|
|
||||||
|
start := build.Clock.Now()
|
||||||
|
|
||||||
|
ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss)
|
||||||
|
err := sm.doSync(ctx, ws.ts)
|
||||||
|
|
||||||
|
ws.dt = build.Clock.Since(start)
|
||||||
|
log.Infof("worker %d done; took %s", ws.id, ws.dt)
|
||||||
|
select {
|
||||||
|
case sm.statusq <- workerStatus{id: ws.id, err: err}:
|
||||||
|
case <-sm.ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// selects the initial sync target by examining known peer heads; only called once for the initial
|
||||||
|
// sync.
|
||||||
|
func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) {
|
||||||
|
var buckets syncBucketSet
|
||||||
|
|
||||||
|
var peerHeads []*types.TipSet
|
||||||
|
for _, ts := range sm.heads {
|
||||||
|
peerHeads = append(peerHeads, ts)
|
||||||
|
}
|
||||||
|
// clear the map, we don't use it any longer
|
||||||
|
sm.heads = nil
|
||||||
|
|
||||||
|
sort.Slice(peerHeads, func(i, j int) bool {
|
||||||
|
return peerHeads[i].Height() < peerHeads[j].Height()
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, ts := range peerHeads {
|
||||||
|
buckets.Insert(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(buckets.buckets) > 1 {
|
||||||
|
log.Warn("caution, multiple distinct chains seen during head selections")
|
||||||
|
// TODO: we *could* refuse to sync here without user intervention.
|
||||||
|
// For now, just select the best cluster
|
||||||
|
}
|
||||||
|
|
||||||
|
return buckets.Heaviest(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on.
|
||||||
|
// this could be either a restart, eg because there is no currently scheduled sync work or a worker
|
||||||
|
// failed or a potential fork.
|
||||||
|
func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) {
|
||||||
|
// Note: we don't need the state lock here to access the active worker states, as the only
|
||||||
|
// competing threads that may access it do so through State() which is read only.
|
||||||
|
|
||||||
|
// if we have recently synced this or any heavier tipset we just ignore it; this can happen
|
||||||
|
// with an empty worker set after we just finished syncing to a target
|
||||||
|
if sm.recent.Synced(ts) {
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the worker set is empty, we have finished syncing and were waiting for the next tipset
|
||||||
|
// in this case, we just return the tipset as work to be done
|
||||||
|
if len(sm.state) == 0 {
|
||||||
|
return ts, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if it is related to any active sync; if so insert into the pending sync queue
|
||||||
|
for _, ws := range sm.state {
|
||||||
|
if ts.Equals(ws.ts) {
|
||||||
|
// ignore it, we are already syncing it
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if ts.Parents() == ws.ts.Key() {
|
||||||
|
// schedule for syncing next; it's an extension of an active sync
|
||||||
|
sm.pend.Insert(ts)
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check to see if it is related to any pending sync; if so insert it into the pending sync queue
|
||||||
|
if sm.pend.RelatedToAny(ts) {
|
||||||
|
sm.pend.Insert(ts)
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// it's not related to any active or pending sync; this could be a fork in which case we
|
||||||
|
// start a new worker to sync it, if it is *heavier* than any active or pending set;
|
||||||
|
// if it is not, we ignore it.
|
||||||
|
for _, ws := range sm.state {
|
||||||
|
if isHeavier(ws.ts, ts) {
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pendHeaviest := sm.pend.Heaviest()
|
||||||
|
if pendHeaviest != nil && isHeavier(pendHeaviest, ts) {
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we have not finished the initial sync or have too many workers, add it to the deferred queue;
|
||||||
|
// it will be processed once a worker is freed from syncing a chain (or the initial sync finishes)
|
||||||
|
if !sm.initialSyncDone || len(sm.state) >= MaxSyncWorkers {
|
||||||
|
log.Infof("deferring sync on %s", ts)
|
||||||
|
sm.deferred.Insert(ts)
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// start a new worker, seems heavy enough and unrelated to active or pending syncs
|
||||||
|
return ts, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// selects the next sync target after a worker sync has finished; returns true and a target
|
||||||
|
// TipSet if this chain should continue to sync because there is a heavier related tipset.
|
||||||
|
func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) {
|
||||||
|
// we pop the related bucket and if there is any related tipset, we work on the heaviest one next
|
||||||
|
// if we are not already working on a heavier tipset
|
||||||
|
related := sm.pend.PopRelated(done)
|
||||||
|
if related == nil {
|
||||||
|
return sm.selectDeferredSyncTarget()
|
||||||
|
}
|
||||||
|
|
||||||
|
heaviest := related.heaviestTipSet()
|
||||||
|
if isHeavier(done, heaviest) {
|
||||||
|
return sm.selectDeferredSyncTarget()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ws := range sm.state {
|
||||||
|
if isHeavier(ws.ts, heaviest) {
|
||||||
|
return sm.selectDeferredSyncTarget()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sm.recent.Synced(heaviest) {
|
||||||
|
return sm.selectDeferredSyncTarget()
|
||||||
|
}
|
||||||
|
|
||||||
|
return heaviest, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// selects a deferred sync target if there is any; these are sync targets that were not related to
|
||||||
|
// active syncs and were deferred because there were too many workers running
|
||||||
|
func (sm *syncManager) selectDeferredSyncTarget() (*types.TipSet, bool, error) {
|
||||||
|
deferredLoop:
|
||||||
|
for !sm.deferred.Empty() {
|
||||||
|
bucket := sm.deferred.Pop()
|
||||||
|
heaviest := bucket.heaviestTipSet()
|
||||||
|
|
||||||
|
if sm.recent.Synced(heaviest) {
|
||||||
|
// we have synced it or something heavier recently, skip it
|
||||||
|
continue deferredLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
if sm.pend.RelatedToAny(heaviest) {
|
||||||
|
// this has converged to a pending sync, insert it to the pending queue
|
||||||
|
sm.pend.Insert(heaviest)
|
||||||
|
continue deferredLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ws := range sm.state {
|
||||||
|
if ws.ts.Equals(heaviest) || isHeavier(ws.ts, heaviest) {
|
||||||
|
// we have converged and are already syncing it or we are syncing on something heavier
|
||||||
|
// ignore it and pop the next deferred bucket
|
||||||
|
continue deferredLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
if heaviest.Parents() == ws.ts.Key() {
|
||||||
|
// we have converged and we are syncing its parent; insert it to the pending queue
|
||||||
|
sm.pend.Insert(heaviest)
|
||||||
|
continue deferredLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// it's not related to any active or pending sync and this worker is free, so sync it!
|
||||||
|
return heaviest, true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isHeavier(a, b *types.TipSet) bool {
|
||||||
|
return a.ParentWeight().GreaterThan(b.ParentWeight())
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync buffer -- this is a circular buffer of recently synced tipsets
|
||||||
|
type syncBuffer struct {
|
||||||
|
buf []*types.TipSet
|
||||||
|
next int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncBuffer(size int) *syncBuffer {
|
||||||
|
return &syncBuffer{buf: make([]*types.TipSet, size)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *syncBuffer) Push(ts *types.TipSet) {
|
||||||
|
sb.buf[sb.next] = ts
|
||||||
|
sb.next++
|
||||||
|
sb.next %= len(sb.buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sb *syncBuffer) Synced(ts *types.TipSet) bool {
|
||||||
|
for _, rts := range sb.buf {
|
||||||
|
if rts != nil && (rts.Equals(ts) || isHeavier(rts, ts)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync buckets and related utilities
|
||||||
type syncBucketSet struct {
|
type syncBucketSet struct {
|
||||||
buckets []*syncTargetBucket
|
buckets []*syncTargetBucket
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type syncTargetBucket struct {
|
||||||
|
tips []*types.TipSet
|
||||||
|
}
|
||||||
|
|
||||||
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
||||||
var stb syncTargetBucket
|
var stb syncTargetBucket
|
||||||
for _, ts := range tipsets {
|
for _, ts := range tipsets {
|
||||||
@ -250,10 +622,6 @@ func (sbs *syncBucketSet) Empty() bool {
|
|||||||
return len(sbs.buckets) == 0
|
return len(sbs.buckets) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type syncTargetBucket struct {
|
|
||||||
tips []*types.TipSet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||||
for _, t := range stb.tips {
|
for _, t := range stb.tips {
|
||||||
if ts.Equals(t) {
|
if ts.Equals(t) {
|
||||||
@ -265,19 +633,43 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
|||||||
if ts.Parents() == t.Key() {
|
if ts.Parents() == t.Key() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if coalesceForksParents && ts.Parents() == t.Parents() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
||||||
|
for i, t := range stb.tips {
|
||||||
for _, t := range stb.tips {
|
|
||||||
if t.Equals(ts) {
|
if t.Equals(ts) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if coalesceTipsets && t.Height() == ts.Height() &&
|
||||||
|
types.CidArrsEqual(t.Blocks()[0].Parents, ts.Blocks()[0].Parents) {
|
||||||
|
miners := make(map[address.Address]struct{})
|
||||||
|
newTs := []*types.BlockHeader{}
|
||||||
|
for _, b := range t.Blocks() {
|
||||||
|
_, have := miners[b.Miner]
|
||||||
|
if !have {
|
||||||
|
newTs = append(newTs, b)
|
||||||
|
miners[b.Miner] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, b := range ts.Blocks() {
|
||||||
|
_, have := miners[b.Miner]
|
||||||
|
if !have {
|
||||||
|
newTs = append(newTs, b)
|
||||||
|
miners[b.Miner] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ts2, err := types.NewTipSet(newTs)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error while trying to recombine a tipset in a bucket: %+v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stb.tips[i] = ts2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stb.tips = append(stb.tips, ts)
|
stb.tips = append(stb.tips, ts)
|
||||||
@ -296,196 +688,3 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
|
|||||||
}
|
}
|
||||||
return best
|
return best
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
|
|
||||||
var buckets syncBucketSet
|
|
||||||
|
|
||||||
var peerHeads []*types.TipSet
|
|
||||||
for _, ts := range sm.peerHeads {
|
|
||||||
peerHeads = append(peerHeads, ts)
|
|
||||||
}
|
|
||||||
sort.Slice(peerHeads, func(i, j int) bool {
|
|
||||||
return peerHeads[i].Height() < peerHeads[j].Height()
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, ts := range peerHeads {
|
|
||||||
buckets.Insert(ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(buckets.buckets) > 1 {
|
|
||||||
log.Warn("caution, multiple distinct chains seen during head selections")
|
|
||||||
// TODO: we *could* refuse to sync here without user intervention.
|
|
||||||
// For now, just select the best cluster
|
|
||||||
}
|
|
||||||
|
|
||||||
return buckets.Heaviest(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) syncScheduler() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ts, ok := <-sm.incomingTipSets:
|
|
||||||
if !ok {
|
|
||||||
log.Info("shutting down sync scheduler")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sm.scheduleIncoming(ts)
|
|
||||||
case res := <-sm.syncResults:
|
|
||||||
sm.scheduleProcessResult(res)
|
|
||||||
case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet():
|
|
||||||
sm.scheduleWorkSent()
|
|
||||||
case <-sm.stop:
|
|
||||||
log.Info("sync scheduler shutting down")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) scheduleIncoming(ts *types.TipSet) {
|
|
||||||
log.Debug("scheduling incoming tipset sync: ", ts.Cids())
|
|
||||||
if sm.getBootstrapState() == BSStateSelected {
|
|
||||||
sm.setBootstrapState(BSStateScheduled)
|
|
||||||
sm.syncTargets <- ts
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var relatedToActiveSync bool
|
|
||||||
for _, acts := range sm.activeSyncs {
|
|
||||||
if ts.Equals(acts) {
|
|
||||||
// ignore, we are already syncing it
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ts.Parents() == acts.Key() {
|
|
||||||
// sync this next, after that sync process finishes
|
|
||||||
relatedToActiveSync = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) {
|
|
||||||
relatedToActiveSync = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// if this is related to an active sync process, immediately bucket it
|
|
||||||
// we don't want to start a parallel sync process that duplicates work
|
|
||||||
if relatedToActiveSync {
|
|
||||||
sm.activeSyncTips.Insert(ts)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if sm.getBootstrapState() == BSStateScheduled {
|
|
||||||
sm.syncQueue.Insert(ts)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) {
|
|
||||||
sm.nextSyncTarget.add(ts)
|
|
||||||
} else {
|
|
||||||
sm.syncQueue.Insert(ts)
|
|
||||||
|
|
||||||
if sm.nextSyncTarget == nil {
|
|
||||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
|
||||||
sm.workerChan = sm.syncTargets
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) scheduleProcessResult(res *syncResult) {
|
|
||||||
if res.success && sm.getBootstrapState() != BSStateComplete {
|
|
||||||
sm.setBootstrapState(BSStateComplete)
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(sm.activeSyncs, res.ts.Key())
|
|
||||||
relbucket := sm.activeSyncTips.PopRelated(res.ts)
|
|
||||||
if relbucket != nil {
|
|
||||||
if res.success {
|
|
||||||
if sm.nextSyncTarget == nil {
|
|
||||||
sm.nextSyncTarget = relbucket
|
|
||||||
sm.workerChan = sm.syncTargets
|
|
||||||
} else {
|
|
||||||
for _, t := range relbucket.tips {
|
|
||||||
sm.syncQueue.Insert(t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// TODO: this is the case where we try to sync a chain, and
|
|
||||||
// fail, and we have more blocks on top of that chain that
|
|
||||||
// have come in since. The question is, should we try to
|
|
||||||
// sync these? or just drop them?
|
|
||||||
log.Error("failed to sync chain but have new unconnected blocks from chain")
|
|
||||||
}
|
|
||||||
|
|
||||||
if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() {
|
|
||||||
next := sm.syncQueue.Pop()
|
|
||||||
if next != nil {
|
|
||||||
sm.nextSyncTarget = next
|
|
||||||
sm.workerChan = sm.syncTargets
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) scheduleWorkSent() {
|
|
||||||
hts := sm.nextSyncTarget.heaviestTipSet()
|
|
||||||
sm.activeSyncs[hts.Key()] = hts
|
|
||||||
|
|
||||||
if !sm.syncQueue.Empty() {
|
|
||||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
|
||||||
} else {
|
|
||||||
sm.nextSyncTarget = nil
|
|
||||||
sm.workerChan = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) syncWorker(id int) {
|
|
||||||
ss := sm.syncStates[id]
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ts, ok := <-sm.syncTargets:
|
|
||||||
if !ok {
|
|
||||||
log.Info("sync manager worker shutting down")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.WithValue(context.TODO(), syncStateKey{}, ss)
|
|
||||||
err := sm.doSync(ctx, ts)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("sync error: %+v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sm.syncResults <- &syncResult{
|
|
||||||
ts: ts,
|
|
||||||
success: err == nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) syncedPeerCount() int {
|
|
||||||
var count int
|
|
||||||
for _, ts := range sm.peerHeads {
|
|
||||||
if ts.Height() > 0 {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) getBootstrapState() int {
|
|
||||||
sm.bssLk.Lock()
|
|
||||||
defer sm.bssLk.Unlock()
|
|
||||||
return sm.bootstrapState
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) setBootstrapState(v int) {
|
|
||||||
sm.bssLk.Lock()
|
|
||||||
defer sm.bssLk.Unlock()
|
|
||||||
sm.bootstrapState = v
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sm *syncManager) IsBootstrapped() bool {
|
|
||||||
sm.bssLk.Lock()
|
|
||||||
defer sm.bssLk.Unlock()
|
|
||||||
return sm.bootstrapState == BSStateComplete
|
|
||||||
}
|
|
||||||
|
@ -10,6 +10,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
BootstrapPeerThreshold = 1
|
||||||
|
}
|
||||||
|
|
||||||
var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0))
|
var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0))
|
||||||
|
|
||||||
type syncOp struct {
|
type syncOp struct {
|
||||||
@ -28,7 +32,12 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T,
|
|||||||
<-ch
|
<-ch
|
||||||
return nil
|
return nil
|
||||||
}).(*syncManager)
|
}).(*syncManager)
|
||||||
sm.bspThresh = thresh
|
|
||||||
|
oldBootstrapPeerThreshold := BootstrapPeerThreshold
|
||||||
|
BootstrapPeerThreshold = thresh
|
||||||
|
defer func() {
|
||||||
|
BootstrapPeerThreshold = oldBootstrapPeerThreshold
|
||||||
|
}()
|
||||||
|
|
||||||
sm.Start()
|
sm.Start()
|
||||||
defer sm.Stop()
|
defer sm.Stop()
|
||||||
@ -87,49 +96,59 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
|||||||
|
|
||||||
runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
|
runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
|
||||||
sm.SetPeerHead(ctx, "peer1", a)
|
sm.SetPeerHead(ctx, "peer1", a)
|
||||||
assertGetSyncOp(t, stc, a)
|
|
||||||
|
|
||||||
sm.SetPeerHead(ctx, "peer1", b1)
|
sm.SetPeerHead(ctx, "peer1", b1)
|
||||||
sm.SetPeerHead(ctx, "peer1", b2)
|
sm.SetPeerHead(ctx, "peer1", b2)
|
||||||
// b1 and b2 are being processed
|
|
||||||
|
|
||||||
b1op := <-stc
|
assertGetSyncOp(t, stc, a)
|
||||||
b2op := <-stc
|
|
||||||
if !b1op.ts.Equals(b1) {
|
// b1 and b2 are in queue after a; the sync manager should pick the heaviest one which is b2
|
||||||
b1op, b2op = b2op, b1op
|
bop := <-stc
|
||||||
|
if !bop.ts.Equals(b2) {
|
||||||
|
t.Fatalf("Expected tipset %s to sync, but got %s", b2, bop.ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0
|
sm.SetPeerHead(ctx, "peer2", c2)
|
||||||
sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1
|
sm.SetPeerHead(ctx, "peer2", c1)
|
||||||
sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0
|
sm.SetPeerHead(ctx, "peer3", b2)
|
||||||
sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0
|
sm.SetPeerHead(ctx, "peer1", a)
|
||||||
|
|
||||||
b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0
|
bop.done()
|
||||||
// even though correct one is index 1
|
|
||||||
|
|
||||||
b2op.done()
|
// get the next sync target; it should be c1 as the heaviest tipset but added last (same weight as c2)
|
||||||
// b2 completes and is not related to c1, so it leaves activeSyncTips as it is
|
bop = <-stc
|
||||||
|
if !bop.ts.Equals(c1) {
|
||||||
|
t.Fatalf("Expected tipset %s to sync, but got %s", c1, bop.ts)
|
||||||
|
}
|
||||||
|
|
||||||
waitUntilAllWorkersAreDone(stc)
|
sm.SetPeerHead(ctx, "peer4", d1)
|
||||||
|
sm.SetPeerHead(ctx, "peer5", e1)
|
||||||
|
bop.done()
|
||||||
|
|
||||||
if len(sm.activeSyncTips.buckets) != 0 {
|
// get the last sync target; it should be e1
|
||||||
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
|
var last *types.TipSet
|
||||||
|
for i := 0; i < 10; {
|
||||||
|
select {
|
||||||
|
case bop = <-stc:
|
||||||
|
bop.done()
|
||||||
|
if last == nil || bop.ts.Height() > last.Height() {
|
||||||
|
last = bop.ts
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
i++
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !last.Equals(e1) {
|
||||||
|
t.Fatalf("Expected tipset %s to sync, but got %s", e1, last)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sm.state) != 0 {
|
||||||
|
t.Errorf("active syncs expected empty but got: %d", len(sm.state))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitUntilAllWorkersAreDone(stc chan *syncOp) {
|
|
||||||
for i := 0; i < 10; {
|
|
||||||
select {
|
|
||||||
case so := <-stc:
|
|
||||||
so.done()
|
|
||||||
default:
|
|
||||||
i++
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncManager(t *testing.T) {
|
func TestSyncManager(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -12,13 +12,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type SyncerStateSnapshot struct {
|
type SyncerStateSnapshot struct {
|
||||||
Target *types.TipSet
|
WorkerID uint64
|
||||||
Base *types.TipSet
|
Target *types.TipSet
|
||||||
Stage api.SyncStateStage
|
Base *types.TipSet
|
||||||
Height abi.ChainEpoch
|
Stage api.SyncStateStage
|
||||||
Message string
|
Height abi.ChainEpoch
|
||||||
Start time.Time
|
Message string
|
||||||
End time.Time
|
Start time.Time
|
||||||
|
End time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncerState struct {
|
type SyncerState struct {
|
||||||
|
18
cli/sync.go
18
cli/sync.go
@ -45,8 +45,8 @@ var syncStatusCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("sync status:")
|
fmt.Println("sync status:")
|
||||||
for i, ss := range state.ActiveSyncs {
|
for _, ss := range state.ActiveSyncs {
|
||||||
fmt.Printf("worker %d:\n", i)
|
fmt.Printf("worker %d:\n", ss.WorkerID)
|
||||||
var base, target []cid.Cid
|
var base, target []cid.Cid
|
||||||
var heightDiff int64
|
var heightDiff int64
|
||||||
var theight abi.ChainEpoch
|
var theight abi.ChainEpoch
|
||||||
@ -263,12 +263,17 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(state.ActiveSyncs) == 0 {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
head, err := napi.ChainHead(ctx)
|
head, err := napi.ChainHead(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
working := 0
|
working := -1
|
||||||
for i, ss := range state.ActiveSyncs {
|
for i, ss := range state.ActiveSyncs {
|
||||||
switch ss.Stage {
|
switch ss.Stage {
|
||||||
case api.StageSyncComplete:
|
case api.StageSyncComplete:
|
||||||
@ -279,7 +284,12 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if working == -1 {
|
||||||
|
working = len(state.ActiveSyncs) - 1
|
||||||
|
}
|
||||||
|
|
||||||
ss := state.ActiveSyncs[working]
|
ss := state.ActiveSyncs[working]
|
||||||
|
workerID := ss.WorkerID
|
||||||
|
|
||||||
var baseHeight abi.ChainEpoch
|
var baseHeight abi.ChainEpoch
|
||||||
var target []cid.Cid
|
var target []cid.Cid
|
||||||
@ -302,7 +312,7 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error {
|
|||||||
fmt.Print("\r\x1b[2K\x1b[A")
|
fmt.Print("\r\x1b[2K\x1b[A")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff)
|
fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", workerID, baseHeight, theight, heightDiff)
|
||||||
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
|
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
|
||||||
lastLines = 2
|
lastLines = 2
|
||||||
|
|
||||||
|
@ -37,13 +37,14 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
|||||||
for i := range states {
|
for i := range states {
|
||||||
ss := &states[i]
|
ss := &states[i]
|
||||||
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
|
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
|
||||||
Base: ss.Base,
|
WorkerID: ss.WorkerID,
|
||||||
Target: ss.Target,
|
Base: ss.Base,
|
||||||
Stage: ss.Stage,
|
Target: ss.Target,
|
||||||
Height: ss.Height,
|
Stage: ss.Stage,
|
||||||
Start: ss.Start,
|
Height: ss.Height,
|
||||||
End: ss.End,
|
Start: ss.Start,
|
||||||
Message: ss.Message,
|
End: ss.End,
|
||||||
|
Message: ss.Message,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api/client"
|
"github.com/filecoin-project/lotus/api/client"
|
||||||
"github.com/filecoin-project/lotus/api/test"
|
"github.com/filecoin-project/lotus/api/test"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
@ -50,6 +51,10 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
chain.BootstrapPeerThreshold = 1
|
||||||
|
}
|
||||||
|
|
||||||
func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {
|
func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {
|
||||||
r := repo.NewMemory(nil)
|
r := repo.NewMemory(nil)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user