Merge pull request #248 from filecoin-project/feat/sync-state-cmd

Add a command to inspect sync state progress
This commit is contained in:
Łukasz Magiera 2019-09-30 18:04:59 -06:00 committed by GitHub
commit bc7faec5d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 190 additions and 0 deletions

View File

@ -57,6 +57,9 @@ type FullNode interface {
ChainGetBlockReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
// syncer
SyncState(context.Context) (*SyncState, error)
// messages
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
@ -292,3 +295,21 @@ type ReplayResults struct {
Receipt *types.MessageReceipt
Error string
}
type SyncState struct {
Base *types.TipSet
Target *types.TipSet
Stage SyncStateStage
Height uint64
}
type SyncStateStage int
const (
StageIdle = SyncStateStage(iota)
StageHeaders
StagePersistHeaders
StageMessages
StageSyncComplete
)

View File

@ -47,6 +47,8 @@ type FullNodeStruct struct {
ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
@ -284,6 +286,10 @@ func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadC
return c.Internal.ChainNotify(ctx)
}
func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) {
return c.Internal.SyncState(ctx)
}
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr)
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
@ -50,6 +51,8 @@ type Syncer struct {
self peer.ID
syncState SyncerState
// peer heads
// Note: clear cache on disconnects
peerHeads map[peer.ID]*types.TipSet
@ -516,6 +519,8 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*t
at = ts.Parents()
}
syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height())
loop:
for blockSet[len(blockSet)-1].Height() > untilHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we
@ -544,6 +549,7 @@ loop:
blockSet = append(blockSet, b)
}
syncer.syncState.SetHeight(blks[len(blockSet)-1].Height())
at = blks[len(blks)-1].Parents()
}
@ -565,6 +571,7 @@ loop:
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
log.Debugf("validating tipset (heigt=%d, size=%d)", fts.TipSet().Height(), len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
@ -572,6 +579,8 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
return xerrors.Errorf("message processing failed: %w", err)
}
syncer.syncState.SetHeight(fts.TipSet().Height())
return nil
})
}
@ -668,11 +677,15 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
}
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), fts.TipSet())
headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet())
if err != nil {
return err
}
syncer.syncState.SetStage(api.StagePersistHeaders)
for _, ts := range headers {
for _, b := range ts.Blocks() {
if err := syncer.store.PersistBlockHeader(b); err != nil {
@ -681,10 +694,14 @@ func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
}
}
syncer.syncState.SetStage(api.StageMessages)
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
return xerrors.Errorf("collectChain syncMessages: %w", err)
}
syncer.syncState.SetStage(api.StageSyncComplete)
return nil
}
@ -700,3 +717,7 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker
return nil
}
func (syncer *Syncer) State() SyncerState {
return syncer.syncState.Snapshot()
}

64
chain/syncstate.go Normal file
View File

@ -0,0 +1,64 @@
package chain
import (
"fmt"
"sync"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/types"
)
func SyncStageString(v api.SyncStateStage) string {
switch v {
case api.StageHeaders:
return "header sync"
case api.StagePersistHeaders:
return "persisting headers"
case api.StageMessages:
return "message sync"
case api.StageSyncComplete:
return "complete"
default:
return fmt.Sprintf("<unknown: %d>", v)
}
}
type SyncerState struct {
lk sync.Mutex
Target *types.TipSet
Base *types.TipSet
Stage api.SyncStateStage
Height uint64
}
func (ss *SyncerState) SetStage(v api.SyncStateStage) {
ss.lk.Lock()
defer ss.lk.Unlock()
ss.Stage = v
}
func (ss *SyncerState) Init(base, target *types.TipSet) {
ss.lk.Lock()
defer ss.lk.Unlock()
ss.Target = target
ss.Base = base
ss.Stage = api.StageHeaders
ss.Height = 0
}
func (ss *SyncerState) SetHeight(h uint64) {
ss.lk.Lock()
defer ss.lk.Unlock()
ss.Height = h
}
func (ss *SyncerState) Snapshot() SyncerState {
ss.lk.Lock()
defer ss.lk.Unlock()
return SyncerState{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
}
}

View File

@ -119,6 +119,7 @@ var Commands = []*cli.Command{
paychCmd,
sendCmd,
stateCmd,
syncCmd,
versionCmd,
walletCmd,
}

50
cli/sync.go Normal file
View File

@ -0,0 +1,50 @@
package cli
import (
"fmt"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/chain"
cid "github.com/ipfs/go-cid"
)
var syncCmd = &cli.Command{
Name: "sync",
Usage: "Inspect or interact with the chain syncer",
Subcommands: []*cli.Command{
syncStatusCmd,
},
}
var syncStatusCmd = &cli.Command{
Name: "status",
Usage: "check sync status",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
ctx := ReqContext(cctx)
ss, err := api.SyncState(ctx)
if err != nil {
return err
}
var base, target []cid.Cid
if ss.Base != nil {
base = ss.Base.Cids()
}
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Println("sync status:")
fmt.Printf("Base:\t%s\n", base)
fmt.Printf("Target:\t%s\n", target)
fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage))
fmt.Printf("Height: %d\n", ss.Height)
return nil
},
}

View File

@ -2,6 +2,7 @@ package impl
import (
"context"
"github.com/filecoin-project/go-lotus/node/impl/client"
"github.com/filecoin-project/go-lotus/node/impl/paych"
@ -23,6 +24,7 @@ type FullNodeAPI struct {
paych.PaychAPI
full.StateAPI
full.WalletAPI
full.SyncAPI
Miner *miner.Miner
}

25
node/impl/full/sync.go Normal file
View File

@ -0,0 +1,25 @@
package full
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"go.uber.org/fx"
)
type SyncAPI struct {
fx.In
Syncer *chain.Syncer
}
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
ss := a.Syncer.State()
return &api.SyncState{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
}, nil
}