Merge pull request #725 from filecoin-project/feat/sync-status-improvements
Sync manager improvements
This commit is contained in:
commit
dbd5933af7
@ -2,6 +2,7 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-filestore"
|
"github.com/ipfs/go-filestore"
|
||||||
@ -263,6 +264,10 @@ type ActiveSync struct {
|
|||||||
|
|
||||||
Stage SyncStateStage
|
Stage SyncStateStage
|
||||||
Height uint64
|
Height uint64
|
||||||
|
|
||||||
|
Start time.Time
|
||||||
|
End time.Time
|
||||||
|
Message string
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncState struct {
|
type SyncState struct {
|
||||||
@ -277,6 +282,7 @@ const (
|
|||||||
StagePersistHeaders
|
StagePersistHeaders
|
||||||
StageMessages
|
StageMessages
|
||||||
StageSyncComplete
|
StageSyncComplete
|
||||||
|
StageSyncErrored
|
||||||
)
|
)
|
||||||
|
|
||||||
type MpoolChange int
|
type MpoolChange int
|
||||||
|
@ -144,7 +144,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
|
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
|
||||||
targetWeight := fts.TipSet().Blocks()[0].ParentWeight
|
targetWeight := fts.TipSet().Blocks()[0].ParentWeight
|
||||||
if targetWeight.LessThan(bestPweight) {
|
if targetWeight.LessThan(bestPweight) {
|
||||||
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
|
var miners []string
|
||||||
|
for _, blk := range fts.TipSet().Blocks() {
|
||||||
|
miners = append(miners, blk.Miner.String())
|
||||||
|
}
|
||||||
|
log.Warnf("incoming tipset from %s does not appear to be better than our best chain, ignoring for now", miners)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1115,6 +1119,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
|||||||
|
|
||||||
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
|
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
ss.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1131,14 +1136,18 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
|||||||
toPersist = append(toPersist, ts.Blocks()...)
|
toPersist = append(toPersist, ts.Blocks()...)
|
||||||
}
|
}
|
||||||
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
|
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
|
||||||
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
|
err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
|
||||||
|
ss.Error(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
toPersist = nil
|
toPersist = nil
|
||||||
|
|
||||||
ss.SetStage(api.StageMessages)
|
ss.SetStage(api.StageMessages)
|
||||||
|
|
||||||
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
|
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
|
||||||
return xerrors.Errorf("collectChain syncMessages: %w", err)
|
err = xerrors.Errorf("collectChain syncMessages: %w", err)
|
||||||
|
ss.Error(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ss.SetStage(api.StageSyncComplete)
|
ss.SetStage(api.StageSyncComplete)
|
||||||
|
@ -3,6 +3,7 @@ package chain
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -18,17 +19,22 @@ func SyncStageString(v api.SyncStateStage) string {
|
|||||||
return "message sync"
|
return "message sync"
|
||||||
case api.StageSyncComplete:
|
case api.StageSyncComplete:
|
||||||
return "complete"
|
return "complete"
|
||||||
|
case api.StageSyncErrored:
|
||||||
|
return "error"
|
||||||
default:
|
default:
|
||||||
return fmt.Sprintf("<unknown: %d>", v)
|
return fmt.Sprintf("<unknown: %d>", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncerState struct {
|
type SyncerState struct {
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
Target *types.TipSet
|
Target *types.TipSet
|
||||||
Base *types.TipSet
|
Base *types.TipSet
|
||||||
Stage api.SyncStateStage
|
Stage api.SyncStateStage
|
||||||
Height uint64
|
Height uint64
|
||||||
|
Message string
|
||||||
|
Start time.Time
|
||||||
|
End time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SyncerState) SetStage(v api.SyncStateStage) {
|
func (ss *SyncerState) SetStage(v api.SyncStateStage) {
|
||||||
@ -39,6 +45,9 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) {
|
|||||||
ss.lk.Lock()
|
ss.lk.Lock()
|
||||||
defer ss.lk.Unlock()
|
defer ss.lk.Unlock()
|
||||||
ss.Stage = v
|
ss.Stage = v
|
||||||
|
if v == api.StageSyncComplete {
|
||||||
|
ss.End = time.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SyncerState) Init(base, target *types.TipSet) {
|
func (ss *SyncerState) Init(base, target *types.TipSet) {
|
||||||
@ -52,6 +61,9 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
|
|||||||
ss.Base = base
|
ss.Base = base
|
||||||
ss.Stage = api.StageHeaders
|
ss.Stage = api.StageHeaders
|
||||||
ss.Height = 0
|
ss.Height = 0
|
||||||
|
ss.Message = ""
|
||||||
|
ss.Start = time.Now()
|
||||||
|
ss.End = time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SyncerState) SetHeight(h uint64) {
|
func (ss *SyncerState) SetHeight(h uint64) {
|
||||||
@ -64,13 +76,28 @@ func (ss *SyncerState) SetHeight(h uint64) {
|
|||||||
ss.Height = h
|
ss.Height = h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ss *SyncerState) Error(err error) {
|
||||||
|
if ss == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
ss.Message = err.Error()
|
||||||
|
ss.Stage = api.StageSyncErrored
|
||||||
|
ss.End = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
func (ss *SyncerState) Snapshot() SyncerState {
|
func (ss *SyncerState) Snapshot() SyncerState {
|
||||||
ss.lk.Lock()
|
ss.lk.Lock()
|
||||||
defer ss.lk.Unlock()
|
defer ss.lk.Unlock()
|
||||||
return SyncerState{
|
return SyncerState{
|
||||||
Base: ss.Base,
|
Base: ss.Base,
|
||||||
Target: ss.Target,
|
Target: ss.Target,
|
||||||
Stage: ss.Stage,
|
Stage: ss.Stage,
|
||||||
Height: ss.Height,
|
Height: ss.Height,
|
||||||
|
Message: ss.Message,
|
||||||
|
Start: ss.Start,
|
||||||
|
End: ss.End,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
18
cli/sync.go
18
cli/sync.go
@ -26,14 +26,14 @@ var syncStatusCmd = &cli.Command{
|
|||||||
Name: "status",
|
Name: "status",
|
||||||
Usage: "check sync status",
|
Usage: "check sync status",
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
api, closer, err := GetFullNodeAPI(cctx)
|
apic, closer, err := GetFullNodeAPI(cctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer closer()
|
defer closer()
|
||||||
ctx := ReqContext(cctx)
|
ctx := ReqContext(cctx)
|
||||||
|
|
||||||
state, err := api.SyncState(ctx)
|
state, err := apic.SyncState(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -43,6 +43,7 @@ var syncStatusCmd = &cli.Command{
|
|||||||
fmt.Printf("worker %d:\n", i)
|
fmt.Printf("worker %d:\n", i)
|
||||||
var base, target []cid.Cid
|
var base, target []cid.Cid
|
||||||
var heightDiff int64
|
var heightDiff int64
|
||||||
|
var theight uint64
|
||||||
if ss.Base != nil {
|
if ss.Base != nil {
|
||||||
base = ss.Base.Cids()
|
base = ss.Base.Cids()
|
||||||
heightDiff = int64(ss.Base.Height())
|
heightDiff = int64(ss.Base.Height())
|
||||||
@ -50,14 +51,25 @@ var syncStatusCmd = &cli.Command{
|
|||||||
if ss.Target != nil {
|
if ss.Target != nil {
|
||||||
target = ss.Target.Cids()
|
target = ss.Target.Cids()
|
||||||
heightDiff = int64(ss.Target.Height()) - heightDiff
|
heightDiff = int64(ss.Target.Height()) - heightDiff
|
||||||
|
theight = ss.Target.Height()
|
||||||
} else {
|
} else {
|
||||||
heightDiff = 0
|
heightDiff = 0
|
||||||
}
|
}
|
||||||
fmt.Printf("\tBase:\t%s\n", base)
|
fmt.Printf("\tBase:\t%s\n", base)
|
||||||
fmt.Printf("\tTarget:\t%s\n", target)
|
fmt.Printf("\tTarget:\t%s (%d)\n", target, theight)
|
||||||
fmt.Printf("\tHeight diff:\t%d\n", heightDiff)
|
fmt.Printf("\tHeight diff:\t%d\n", heightDiff)
|
||||||
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
|
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
|
||||||
fmt.Printf("\tHeight: %d\n", ss.Height)
|
fmt.Printf("\tHeight: %d\n", ss.Height)
|
||||||
|
if ss.End.IsZero() {
|
||||||
|
if !ss.Start.IsZero() {
|
||||||
|
fmt.Printf("\tElapsed: %s\n", time.Since(ss.Start))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Printf("\tElapsed: %s\n", ss.End.Sub(ss.Start))
|
||||||
|
}
|
||||||
|
if ss.Stage == api.StageSyncErrored {
|
||||||
|
fmt.Printf("\tError: %s\n", ss.Message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -26,10 +26,13 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
|||||||
|
|
||||||
for _, ss := range states {
|
for _, ss := range states {
|
||||||
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
|
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
|
||||||
Base: ss.Base,
|
Base: ss.Base,
|
||||||
Target: ss.Target,
|
Target: ss.Target,
|
||||||
Stage: ss.Stage,
|
Stage: ss.Stage,
|
||||||
Height: ss.Height,
|
Height: ss.Height,
|
||||||
|
Start: ss.Start,
|
||||||
|
End: ss.End,
|
||||||
|
Message: ss.Message,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user