plumb through new sync status logic

This commit is contained in:
whyrusleeping 2019-11-15 17:05:16 -08:00
parent 7aa76d21d1
commit 251ff41134
5 changed files with 78 additions and 38 deletions

View File

@ -253,7 +253,7 @@ type ReplayResults struct {
Error string
}
type SyncState struct {
type ActiveSync struct {
Base *types.TipSet
Target *types.TipSet
@ -261,6 +261,10 @@ type SyncState struct {
Height uint64
}
type SyncState struct {
ActiveSyncs []ActiveSync
}
type SyncStateStage int
const (

View File

@ -127,7 +127,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
}
syncer.Bsync.AddPeer(from)
syncer.syncmgr.SetPeerHead(from, fts.TipSet())
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
/*
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
@ -391,6 +391,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
@ -398,9 +399,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
)
}
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) {
return nil
}
@ -1084,7 +1082,10 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker
return nil
}
func (syncer *Syncer) State() SyncerState {
panic("NYI")
//return syncer.syncState.Snapshot()
func (syncer *Syncer) State() []SyncerState {
var out []SyncerState
for _, ss := range syncer.syncmgr.syncStates {
out = append(out, ss.Snapshot())
}
return out
}

View File

@ -24,9 +24,9 @@ type SyncManager struct {
syncTargets chan *types.TipSet
syncResults chan *syncResult
activeSyncs map[types.TipSetKey]*types.TipSet
syncStates []*SyncerState
syncState SyncerState
activeSyncs map[types.TipSetKey]*types.TipSet
doSync func(context.Context, *types.TipSet) error
@ -46,6 +46,7 @@ func NewSyncManager(sync SyncFunc) *SyncManager {
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
syncResults: make(chan *syncResult),
syncStates: make([]*SyncerState, syncWorkerCount),
incomingTipSets: make(chan *types.TipSet),
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
doSync: sync,
@ -64,17 +65,15 @@ func (sm *SyncManager) Stop() {
close(sm.stop)
}
func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
log.Info("set peer head!")
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
log.Info("set peer head!", ts.Height(), ts.Cids())
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
if !sm.bootstrapped {
log.Info("not bootstrapped")
spc := sm.syncedPeerCount()
if spc >= sm.bspThresh {
log.Info("go time!")
// Its go time!
target, err := sm.selectSyncTarget()
if err != nil {
@ -320,6 +319,8 @@ func (sm *SyncManager) syncScheduler() {
}
func (sm *SyncManager) syncWorker(id int) {
ss := &SyncerState{}
sm.syncStates[id] = ss
for {
select {
case ts, ok := <-sm.syncTargets:
@ -327,9 +328,10 @@ func (sm *SyncManager) syncWorker(id int) {
log.Info("sync manager worker shutting down")
return
}
log.Info("sync worker go time!", ts.Cids())
log.Info("sync worker go time!", ts.Height(), ts.Cids())
err := sm.doSync(context.TODO(), ts)
ctx := context.WithValue(context.TODO(), syncStateKey, ss)
err := sm.doSync(ctx, ts)
if err != nil {
log.Errorf("sync error: %+v", err)
}

View File

@ -31,24 +31,26 @@ var syncStatusCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
ss, err := api.SyncState(ctx)
state, err := api.SyncState(ctx)
if err != nil {
return err
}
var base, target []cid.Cid
if ss.Base != nil {
base = ss.Base.Cids()
}
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Println("sync status:")
fmt.Printf("Base:\t%s\n", base)
fmt.Printf("Target:\t%s\n", target)
fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage))
fmt.Printf("Height: %d\n", ss.Height)
for i, ss := range state.ActiveSyncs {
fmt.Printf("worker %d:\n", i)
var base, target []cid.Cid
if ss.Base != nil {
base = ss.Base.Cids()
}
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Printf("\tBase:\t%s\n", base)
fmt.Printf("\tTarget:\t%s\n", target)
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
fmt.Printf("\tHeight: %d\n", ss.Height)
}
return nil
},
}
@ -65,17 +67,42 @@ var syncWaitCmd = &cli.Command{
ctx := ReqContext(cctx)
for {
ss, err := napi.SyncState(ctx)
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
var complete bool
working := -1
for i, ss := range state.ActiveSyncs {
switch ss.Stage {
case api.StageSyncComplete:
complete = true
default:
working = i
case api.StageIdle:
// not complete, not actively working
}
}
if complete && working != -1 {
fmt.Println("\nDone")
return nil
}
if working == -1 {
fmt.Println("Idle...")
continue
}
ss := state.ActiveSyncs[working]
var target []cid.Cid
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Printf("\r\x1b[2KTarget: %s\tState: %s\tHeight: %d", target, chain.SyncStageString(ss.Stage), ss.Height)
fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height)
if ss.Stage == api.StageSyncComplete {
fmt.Println("\nDone")
return nil

View File

@ -20,13 +20,19 @@ type SyncAPI struct {
}
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
ss := a.Syncer.State()
return &api.SyncState{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
}, nil
states := a.Syncer.State()
out := &api.SyncState{}
for _, ss := range states {
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
})
}
return out, nil
}
func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {