Merge pull request #3754 from filecoin-project/feat/shed-export

Add faster and slimmer option to chain export via lotus-shed
This commit is contained in:
Łukasz Magiera 2020-09-11 11:10:41 +02:00 committed by GitHub
commit 55e4183825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 171 additions and 17 deletions

View File

@ -118,7 +118,8 @@ type FullNode interface {
// The exported chain data includes the header chain from the given tipset
// back to genesis, the entire genesis state, and the most recent 'nroots'
// state trees.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error)
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error)
// MethodGroup: Beacon
// The Beacon method group contains methods for interacting with the random beacon (DRAND)

View File

@ -95,7 +95,7 @@ type FullNodeStruct struct {
ChainGetNode func(ctx context.Context, p string) (*api.IpldObject, error) `perm:"read"`
ChainGetMessage func(context.Context, cid.Cid) (*types.Message, error) `perm:"read"`
ChainGetPath func(context.Context, types.TipSetKey, types.TipSetKey) ([]*api.HeadChange, error) `perm:"read"`
ChainExport func(context.Context, abi.ChainEpoch, types.TipSetKey) (<-chan []byte, error) `perm:"read"`
ChainExport func(context.Context, abi.ChainEpoch, bool, types.TipSetKey) (<-chan []byte, error) `perm:"read"`
BeaconGetEntry func(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"`
@ -692,8 +692,8 @@ func (c *FullNodeStruct) ChainGetPath(ctx context.Context, from types.TipSetKey,
return c.Internal.ChainGetPath(ctx, from, to)
}
func (c *FullNodeStruct) ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error) {
return c.Internal.ChainExport(ctx, nroots, tsk)
func (c *FullNodeStruct) ChainExport(ctx context.Context, nroots abi.ChainEpoch, iom bool, tsk types.TipSetKey) (<-chan []byte, error) {
return c.Internal.ChainExport(ctx, nroots, iom, tsk)
}
func (c *FullNodeStruct) BeaconGetEntry(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) {

View File

@ -1159,7 +1159,7 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.
return in, rerr
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, w io.Writer) error {
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
@ -1197,9 +1197,13 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
}
cids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
var cids []cid.Cid
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
mcids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
}
cids = mcids
}
if b.Height > 0 {

View File

@ -96,7 +96,7 @@ func TestChainExportImport(t *testing.T) {
}
buf := new(bytes.Buffer)
if err := cg.ChainStore().Export(context.TODO(), last, 0, buf); err != nil {
if err := cg.ChainStore().Export(context.TODO(), last, 0, false, buf); err != nil {
t.Fatal(err)
}

View File

@ -844,6 +844,9 @@ var chainExportCmd = &cli.Command{
Name: "recent-stateroots",
Usage: "specify the number of recent state roots to include in the export",
},
&cli.BoolFlag{
Name: "skip-old-msgs",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
@ -878,7 +881,13 @@ var chainExportCmd = &cli.Command{
return err
}
stream, err := api.ChainExport(ctx, rsrs, ts.Key())
skipold := cctx.Bool("skip-old-msgs")
if rsrs == 0 && skipold {
return fmt.Errorf("must pass recent stateroots along with skip-old-msgs")
}
stream, err := api.ChainExport(ctx, rsrs, skipold, ts.Key())
if err != nil {
return err
}

View File

@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/exported"
@ -122,7 +123,7 @@ var stateMinerInfo = &cli.Command{
},
}
func parseTipSetString(ts string) ([]cid.Cid, error) {
func ParseTipSetString(ts string) ([]cid.Cid, error) {
strs := strings.Split(ts, ",")
var cids []cid.Cid
@ -160,7 +161,7 @@ func ParseTipSetRef(ctx context.Context, api api.FullNode, tss string) (*types.T
return api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(h), types.EmptyTSK)
}
cids, err := parseTipSetString(tss)
cids, err := ParseTipSetString(tss)
if err != nil {
return nil, err
}
@ -1384,7 +1385,7 @@ var stateCallCmd = &cli.Command{
}
if ret.MsgRct.ExitCode != 0 {
return fmt.Errorf("invocation failed (exit: %d): %s", ret.MsgRct.ExitCode, ret.Error)
return fmt.Errorf("invocation failed (exit: %d, gasUsed: %d): %s", ret.MsgRct.ExitCode, ret.MsgRct.GasUsed, ret.Error)
}
s, err := formatOutput(cctx.String("ret"), ret.MsgRct.Return)
@ -1392,6 +1393,7 @@ var stateCallCmd = &cli.Command{
return fmt.Errorf("failed to format output: %s", err)
}
fmt.Printf("gas used: %d\n", ret.MsgRct.GasUsed)
fmt.Printf("return: %s\n", s)
return nil
@ -1465,11 +1467,11 @@ func parseParamsForMethod(act cid.Cid, method uint64, args []string) ([]byte, er
f := methods[method]
rf := reflect.TypeOf(f)
if rf.NumIn() != 3 {
if rf.NumIn() != 2 {
return nil, fmt.Errorf("expected referenced method to have three arguments")
}
paramObj := rf.In(2).Elem()
paramObj := rf.In(1).Elem()
if paramObj.NumField() != len(args) {
return nil, fmt.Errorf("not enough arguments given to call that method (expecting %d)", paramObj.NumField())
}
@ -1489,6 +1491,18 @@ func parseParamsForMethod(act cid.Cid, method uint64, args []string) ([]byte, er
return nil, err
}
p.Elem().Field(i).Set(reflect.ValueOf(val))
case reflect.TypeOf(abi.ChainEpoch(0)):
val, err := strconv.ParseInt(args[i], 10, 64)
if err != nil {
return nil, err
}
p.Elem().Field(i).Set(reflect.ValueOf(abi.ChainEpoch(val)))
case reflect.TypeOf(big.Int{}):
val, err := big.FromString(args[i])
if err != nil {
return nil, err
}
p.Elem().Field(i).Set(reflect.ValueOf(val))
case reflect.TypeOf(peer.ID("")):
pid, err := peer.Decode(args[i])
if err != nil {

123
cmd/lotus-shed/export.go Normal file
View File

@ -0,0 +1,123 @@
package main
import (
"context"
"fmt"
"os"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo"
)
var exportChainCmd = &cli.Command{
Name: "export",
Description: "Export chain from repo (requires node to be offline)",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.StringFlag{
Name: "tipset",
Usage: "tipset to export from",
},
&cli.Int64Flag{
Name: "recent-stateroots",
},
&cli.BoolFlag{
Name: "full-state",
},
&cli.BoolFlag{
Name: "skip-old-msgs",
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
}
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
fi, err := os.Create(cctx.Args().First())
if err != nil {
return xerrors.Errorf("opening the output file: %w", err)
}
defer fi.Close() //nolint:errcheck
ds, err := lr.Datastore("/chain")
if err != nil {
return err
}
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, nil)
if err := cs.Load(); err != nil {
return err
}
nroots := abi.ChainEpoch(cctx.Int64("recent-stateroots"))
fullstate := cctx.Bool("full-state")
skipoldmsgs := cctx.Bool("skip-old-msgs")
var ts *types.TipSet
if tss := cctx.String("tipset"); tss != "" {
cids, err := lcli.ParseTipSetString(tss)
if err != nil {
return xerrors.Errorf("failed to parse tipset (%q): %w", tss, err)
}
tsk := types.NewTipSetKey(cids...)
selts, err := cs.LoadTipSet(tsk)
if err != nil {
return xerrors.Errorf("loading tipset: %w", err)
}
ts = selts
} else {
ts = cs.GetHeaviestTipSet()
}
if fullstate {
nroots = ts.Height() + 1
}
if err := cs.Export(ctx, ts, nroots, skipoldmsgs, fi); err != nil {
return xerrors.Errorf("export failed: %w", err)
}
return nil
},
}

View File

@ -34,6 +34,7 @@ func main() {
genesisVerifyCmd,
mathCmd,
mpoolStatsCmd,
exportChainCmd,
}
app := &cli.App{

View File

@ -284,6 +284,7 @@ ChainExport returns a stream of bytes with CAR dump of chain data.
The exported chain data includes the header chain from the given tipset
back to genesis, the entire genesis state, and the most recent 'nroots'
state trees.
If oldmsgskip is set, messages from before the requested roots are also not included.
Perms: read
@ -292,6 +293,7 @@ Inputs:
```json
[
10101,
true,
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"

View File

@ -495,7 +495,7 @@ func (a *ChainAPI) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Mess
return cm.VMMessage(), nil
}
func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk types.TipSetKey) (<-chan []byte, error) {
func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
@ -508,7 +508,7 @@ func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, tsk t
bw := bufio.NewWriterSize(w, 1<<20)
defer bw.Flush() //nolint:errcheck // it is a write to a pipe
if err := a.Chain.Export(ctx, ts, nroots, bw); err != nil {
if err := a.Chain.Export(ctx, ts, nroots, skipoldmsgs, bw); err != nil {
log.Errorf("chain export call failed: %s", err)
return
}