Merge pull request #3463 from filecoin-project/feat/chain-state-export

allow exporting a number of recent chain state trees
This commit is contained in:
Łukasz Magiera 2020-09-02 17:21:14 +02:00 committed by GitHub
commit cfbbcd420f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 22 deletions

View File

@ -109,7 +109,10 @@ type FullNode interface {
ChainGetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*HeadChange, error) ChainGetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*HeadChange, error)
// ChainExport returns a stream of bytes with CAR dump of chain data. // ChainExport returns a stream of bytes with CAR dump of chain data.
ChainExport(context.Context, types.TipSetKey) (<-chan []byte, error) // The exported chain data includes the header chain from the given tipset
// back to genesis, the entire genesis state, and the most recent 'nroots'
// state trees.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error)
// MethodGroup: Beacon // MethodGroup: Beacon
// The Beacon method group contains methods for interacting with the random beacon (DRAND) // The Beacon method group contains methods for interacting with the random beacon (DRAND)

View File

@ -86,7 +86,7 @@ type FullNodeStruct struct {
ChainGetNode func(ctx context.Context, p string) (*api.IpldObject, error) `perm:"read"` ChainGetNode func(ctx context.Context, p string) (*api.IpldObject, error) `perm:"read"`
ChainGetMessage func(context.Context, cid.Cid) (*types.Message, error) `perm:"read"` ChainGetMessage func(context.Context, cid.Cid) (*types.Message, error) `perm:"read"`
ChainGetPath func(context.Context, types.TipSetKey, types.TipSetKey) ([]*api.HeadChange, error) `perm:"read"` ChainGetPath func(context.Context, types.TipSetKey, types.TipSetKey) ([]*api.HeadChange, error) `perm:"read"`
ChainExport func(context.Context, types.TipSetKey) (<-chan []byte, error) `perm:"read"` ChainExport func(context.Context, abi.ChainEpoch, types.TipSetKey) (<-chan []byte, error) `perm:"read"`
BeaconGetEntry func(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"` BeaconGetEntry func(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"`
@ -654,8 +654,8 @@ func (c *FullNodeStruct) ChainGetPath(ctx context.Context, from types.TipSetKey,
return c.Internal.ChainGetPath(ctx, from, to) return c.Internal.ChainGetPath(ctx, from, to)
} }
func (c *FullNodeStruct) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan []byte, error) { func (c *FullNodeStruct) ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error) {
return c.Internal.ChainExport(ctx, tsk) return c.Internal.ChainExport(ctx, nroots, tsk)
} }
func (c *FullNodeStruct) BeaconGetEntry(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) { func (c *FullNodeStruct) BeaconGetEntry(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) {

View File

@ -1114,7 +1114,7 @@ func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, t
return cs.LoadTipSet(lbts.Parents()) return cs.LoadTipSet(lbts.Parents())
} }
func recurseLinks(bs bstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) { func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
if root.Prefix().Codec != cid.DagCBOR { if root.Prefix().Codec != cid.DagCBOR {
return in, nil return in, nil
} }
@ -1131,9 +1131,14 @@ func recurseLinks(bs bstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid,
return return
} }
// traversed this already...
if !walked.Visit(c) {
return
}
in = append(in, c) in = append(in, c)
var err error var err error
in, err = recurseLinks(bs, c, in) in, err = recurseLinks(bs, walked, c, in)
if err != nil { if err != nil {
rerr = err rerr = err
} }
@ -1145,12 +1150,13 @@ func recurseLinks(bs bstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid,
return in, rerr return in, rerr
} }
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer) error { func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, w io.Writer) error {
if ts == nil { if ts == nil {
ts = cs.GetHeaviestTipSet() ts = cs.GetHeaviestTipSet()
} }
seen := cid.NewSet() seen := cid.NewSet()
walked := cid.NewSet()
h := &car.CarHeader{ h := &car.CarHeader{
Roots: ts.Cids(), Roots: ts.Cids(),
@ -1182,7 +1188,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer)
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err) return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
} }
cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages}) cids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil { if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err) return xerrors.Errorf("recursing messages failed: %w", err)
} }
@ -1198,8 +1204,8 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer)
out := cids out := cids
if b.Height == 0 { if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) cids, err := recurseLinks(cs.bs, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil { if err != nil {
return xerrors.Errorf("recursing genesis state failed: %w", err) return xerrors.Errorf("recursing genesis state failed: %w", err)
} }

View File

@ -96,7 +96,7 @@ func TestChainExportImport(t *testing.T) {
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := cg.ChainStore().Export(context.TODO(), last, buf); err != nil { if err := cg.ChainStore().Export(context.TODO(), last, 0, buf); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -41,7 +41,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn
var st power.State var st power.State
if err := cst.Get(ctx, act.Head, &st); err != nil { if err := cst.Get(ctx, act.Head, &st); err != nil {
return types.NewInt(0), xerrors.Errorf("get power actor head: %w", err) return types.NewInt(0), xerrors.Errorf("get power actor head (%s, height=%d): %w", act.Head, ts.Height(), err)
} }
tpow = st.TotalQualityAdjPower // TODO: REVIEW: Is this correct? tpow = st.TotalQualityAdjPower // TODO: REVIEW: Is this correct?
} }

View File

@ -859,6 +859,10 @@ var chainExportCmd = &cli.Command{
&cli.StringFlag{ &cli.StringFlag{
Name: "tipset", Name: "tipset",
}, },
&cli.Int64Flag{
Name: "recent-stateroots",
Usage: "specify the number of recent state roots to include in the export",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx) api, closer, err := GetFullNodeAPI(cctx)
@ -872,6 +876,11 @@ var chainExportCmd = &cli.Command{
return fmt.Errorf("must specify filename to export chain to") return fmt.Errorf("must specify filename to export chain to")
} }
rsrs := abi.ChainEpoch(cctx.Int64("recent-stateroots"))
if cctx.IsSet("recent-stateroots") && rsrs < build.Finality {
return fmt.Errorf("\"recent-stateroots\" has to be greater than %d", build.Finality)
}
fi, err := os.Create(cctx.Args().First()) fi, err := os.Create(cctx.Args().First())
if err != nil { if err != nil {
return err return err
@ -888,7 +897,7 @@ var chainExportCmd = &cli.Command{
return err return err
} }
stream, err := api.ChainExport(ctx, ts.Key()) stream, err := api.ChainExport(ctx, rsrs, ts.Key())
if err != nil { if err != nil {
return err return err
} }

View File

@ -100,7 +100,11 @@ var DaemonCmd = &cli.Command{
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "import-chain", Name: "import-chain",
Usage: "on first run, load chain from given file", Usage: "on first run, load chain from given file and validate",
},
&cli.StringFlag{
Name: "import-snapshot",
Usage: "import chain state from a given chain export file",
}, },
&cli.BoolFlag{ &cli.BoolFlag{
Name: "halt-after-import", Name: "halt-after-import",
@ -191,13 +195,23 @@ var DaemonCmd = &cli.Command{
} }
chainfile := cctx.String("import-chain") chainfile := cctx.String("import-chain")
if chainfile != "" { snapshot := cctx.String("import-snapshot")
if chainfile != "" || snapshot != "" {
if chainfile != "" && snapshot != "" {
return fmt.Errorf("cannot specify both 'import-snapshot' and 'import-chain'")
}
var issnapshot bool
if chainfile == "" {
chainfile = snapshot
issnapshot = true
}
chainfile, err := homedir.Expand(chainfile) chainfile, err := homedir.Expand(chainfile)
if err != nil { if err != nil {
return err return err
} }
if err := ImportChain(r, chainfile); err != nil { if err := ImportChain(r, chainfile, issnapshot); err != nil {
return err return err
} }
if cctx.Bool("halt-after-import") { if cctx.Bool("halt-after-import") {
@ -312,7 +326,7 @@ func importKey(ctx context.Context, api api.FullNode, f string) error {
return nil return nil
} }
func ImportChain(r repo.Repo, fname string) error { func ImportChain(r repo.Repo, fname string, snapshot bool) error {
fi, err := os.Open(fname) fi, err := os.Open(fname)
if err != nil { if err != nil {
return err return err
@ -357,9 +371,11 @@ func ImportChain(r repo.Repo, fname string) error {
stm := stmgr.NewStateManager(cst) stm := stmgr.NewStateManager(cst)
log.Infof("validating imported chain...") if !snapshot {
if err := stm.ValidateChain(context.TODO(), ts); err != nil { log.Infof("validating imported chain...")
return xerrors.Errorf("chain validation failed: %w", err) if err := stm.ValidateChain(context.TODO(), ts); err != nil {
return xerrors.Errorf("chain validation failed: %w", err)
}
} }
log.Info("accepting %s as new head", ts.Cids()) log.Info("accepting %s as new head", ts.Cids())

View File

@ -268,6 +268,9 @@ blockchain, but that do not require any form of state computation.
### ChainExport ### ChainExport
ChainExport returns a stream of bytes with CAR dump of chain data. ChainExport returns a stream of bytes with CAR dump of chain data.
The exported chain data includes the header chain from the given tipset
back to genesis, the entire genesis state, and the most recent 'nroots'
state trees.
Perms: read Perms: read
@ -275,6 +278,7 @@ Perms: read
Inputs: Inputs:
```json ```json
[ [
10101,
[ [
{ {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"

View File

@ -494,7 +494,7 @@ func (a *ChainAPI) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Mess
return cm.VMMessage(), nil return cm.VMMessage(), nil
} }
func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan []byte, error) { func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
@ -503,7 +503,7 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan
out := make(chan []byte) out := make(chan []byte)
go func() { go func() {
defer w.Close() //nolint:errcheck // it is a pipe defer w.Close() //nolint:errcheck // it is a pipe
if err := a.Chain.Export(ctx, ts, w); err != nil { if err := a.Chain.Export(ctx, ts, nroots, w); err != nil {
log.Errorf("chain export call failed: %s", err) log.Errorf("chain export call failed: %s", err)
return return
} }