diff --git a/api/api.go b/api/api.go index 13d4de47d..330920bf0 100644 --- a/api/api.go +++ b/api/api.go @@ -57,6 +57,9 @@ type FullNode interface { ChainGetBlockReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + // syncer + SyncState(context.Context) (*SyncState, error) + // messages MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) @@ -292,3 +295,21 @@ type ReplayResults struct { Receipt *types.MessageReceipt Error string } + +type SyncState struct { + Base *types.TipSet + Target *types.TipSet + + Stage SyncStateStage + Height uint64 +} + +type SyncStateStage int + +const ( + StageIdle = SyncStateStage(iota) + StageHeaders + StagePersistHeaders + StageMessages + StageSyncComplete +) diff --git a/api/struct.go b/api/struct.go index 78a08bb62..b4d548f7f 100644 --- a/api/struct.go +++ b/api/struct.go @@ -47,6 +47,8 @@ type FullNodeStruct struct { ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"` + SyncState func(context.Context) (*SyncState, error) `perm:"read"` + MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"` @@ -284,6 +286,10 @@ func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadC return c.Internal.ChainNotify(ctx) } +func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) { + return c.Internal.SyncState(ctx) +} + func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) { return c.Internal.StateMinerSectors(ctx, addr) } diff --git a/chain/sync.go b/chain/sync.go index 04ad22006..b83709387 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" @@ -50,6 +51,8 @@ type Syncer struct { self peer.ID + syncState SyncerState + // peer heads // Note: clear cache on disconnects peerHeads map[peer.ID]*types.TipSet @@ -516,6 +519,8 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*t at = ts.Parents() } + syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height()) + loop: for blockSet[len(blockSet)-1].Height() > untilHeight { // NB: GetBlocks validates that the blocks are in-fact the ones we @@ -544,6 +549,7 @@ loop: blockSet = append(blockSet, b) } + syncer.syncState.SetHeight(blks[len(blockSet)-1].Height()) at = blks[len(blks)-1].Parents() } @@ -565,6 +571,7 @@ loop: } func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { + syncer.syncState.SetHeight(0) return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { log.Debugf("validating tipset (heigt=%d, size=%d)", fts.TipSet().Height(), len(fts.TipSet().Cids())) if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { @@ -572,6 +579,8 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { return xerrors.Errorf("message processing failed: %w", err) } + syncer.syncState.SetHeight(fts.TipSet().Height()) + return nil }) } @@ -668,11 +677,15 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error { } func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { + syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), fts.TipSet()) + headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet()) if err != nil { return err } + syncer.syncState.SetStage(api.StagePersistHeaders) + for _, ts := range headers { for _, b := range ts.Blocks() { if err := syncer.store.PersistBlockHeader(b); err != nil { @@ -681,10 +694,14 @@ func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { } } + syncer.syncState.SetStage(api.StageMessages) + if err := syncer.syncMessagesAndCheckState(headers); err != nil { return xerrors.Errorf("collectChain syncMessages: %w", err) } + syncer.syncState.SetStage(api.StageSyncComplete) + return nil } @@ -700,3 +717,7 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker return nil } + +func (syncer *Syncer) State() SyncerState { + return syncer.syncState.Snapshot() +} diff --git a/chain/syncstate.go b/chain/syncstate.go new file mode 100644 index 000000000..e7452dd8e --- /dev/null +++ b/chain/syncstate.go @@ -0,0 +1,64 @@ +package chain + +import ( + "fmt" + "sync" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func SyncStageString(v api.SyncStateStage) string { + switch v { + case api.StageHeaders: + return "header sync" + case api.StagePersistHeaders: + return "persisting headers" + case api.StageMessages: + return "message sync" + case api.StageSyncComplete: + return "complete" + default: + return fmt.Sprintf("", v) + } +} + +type SyncerState struct { + lk sync.Mutex + Target *types.TipSet + Base *types.TipSet + Stage api.SyncStateStage + Height uint64 +} + +func (ss *SyncerState) SetStage(v api.SyncStateStage) { + ss.lk.Lock() + defer ss.lk.Unlock() + ss.Stage = v +} + +func (ss *SyncerState) Init(base, target *types.TipSet) { + ss.lk.Lock() + defer ss.lk.Unlock() + ss.Target = target + ss.Base = base + ss.Stage = api.StageHeaders + ss.Height = 0 +} + +func (ss *SyncerState) SetHeight(h uint64) { + ss.lk.Lock() + defer ss.lk.Unlock() + ss.Height = h +} + +func (ss *SyncerState) Snapshot() SyncerState { + ss.lk.Lock() + defer ss.lk.Unlock() + return SyncerState{ + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + } +} diff --git a/cli/cmd.go b/cli/cmd.go index 631d0d49d..872c65bb4 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -119,6 +119,7 @@ var Commands = []*cli.Command{ paychCmd, sendCmd, stateCmd, + syncCmd, versionCmd, walletCmd, } diff --git a/cli/sync.go b/cli/sync.go new file mode 100644 index 000000000..8007c6e0c --- /dev/null +++ b/cli/sync.go @@ -0,0 +1,50 @@ +package cli + +import ( + "fmt" + + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/chain" + cid "github.com/ipfs/go-cid" +) + +var syncCmd = &cli.Command{ + Name: "sync", + Usage: "Inspect or interact with the chain syncer", + Subcommands: []*cli.Command{ + syncStatusCmd, + }, +} + +var syncStatusCmd = &cli.Command{ + Name: "status", + Usage: "check sync status", + Action: func(cctx *cli.Context) error { + api, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + ctx := ReqContext(cctx) + + ss, err := api.SyncState(ctx) + if err != nil { + return err + } + + var base, target []cid.Cid + if ss.Base != nil { + base = ss.Base.Cids() + } + if ss.Target != nil { + target = ss.Target.Cids() + } + + fmt.Println("sync status:") + fmt.Printf("Base:\t%s\n", base) + fmt.Printf("Target:\t%s\n", target) + fmt.Printf("Stage: %s\n", chain.SyncStageString(ss.Stage)) + fmt.Printf("Height: %d\n", ss.Height) + return nil + }, +} diff --git a/node/impl/full.go b/node/impl/full.go index 7092f42b8..851ff9eec 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -2,6 +2,7 @@ package impl import ( "context" + "github.com/filecoin-project/go-lotus/node/impl/client" "github.com/filecoin-project/go-lotus/node/impl/paych" @@ -23,6 +24,7 @@ type FullNodeAPI struct { paych.PaychAPI full.StateAPI full.WalletAPI + full.SyncAPI Miner *miner.Miner } diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go new file mode 100644 index 000000000..55eb473e7 --- /dev/null +++ b/node/impl/full/sync.go @@ -0,0 +1,25 @@ +package full + +import ( + "context" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain" + "go.uber.org/fx" +) + +type SyncAPI struct { + fx.In + + Syncer *chain.Syncer +} + +func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { + ss := a.Syncer.State() + return &api.SyncState{ + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + }, nil +}