Includes changes from: - -
491 lines
11 KiB
491 lines
11 KiB
package main
import (
offline ""
ipld ""
block ""
mh ""
lcli ""
var exportChainCmd = &cli.Command{
Name: "export",
Description: "Export chain from repo (requires node to be offline)",
Flags: []cli.Flag{
Name: "repo",
Value: "~/.lotus",
Name: "tipset",
Usage: "tipset to export from",
Name: "recent-stateroots",
Name: "full-state",
Name: "skip-old-msgs",
Subcommands: []*cli.Command{
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
exists, err := r.Exists()
if err != nil {
return err
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
defer lr.Close() //nolint:errcheck
fi, err := os.Create(cctx.Args().First())
if err != nil {
return xerrors.Errorf("opening the output file: %w", err)
defer fi.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
mds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck
if err := cs.Load(context.Background()); err != nil {
return err
nroots := abi.ChainEpoch(cctx.Int64("recent-stateroots"))
fullstate := cctx.Bool("full-state")
skipoldmsgs := cctx.Bool("skip-old-msgs")
ts, err := lcli.ParseTipSetRefOffline(ctx, cs, cctx.String("tipset"))
if err != nil {
return err
if fullstate {
nroots = ts.Height() + 1
if err := cs.Export(ctx, ts, nroots, skipoldmsgs, fi); err != nil {
return xerrors.Errorf("export failed: %w", err)
return nil
var exportRawCmd = &cli.Command{
Name: "raw",
Description: "Export raw blocks from repo (requires node to be offline)",
Flags: []cli.Flag{
Name: "repo",
Value: "~/.lotus",
Name: "car-size",
Value: "50M",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
exists, err := r.Exists()
if err != nil {
return err
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
lr, err := r.LockRO(repo.FullNode)
if err != nil {
return err
defer lr.Close() //nolint:errcheck
out := cctx.Args().First()
err = os.Mkdir(out, 0755)
if err != nil {
return xerrors.Errorf("creating output dir: %w", err)
maxSz, err := humanize.ParseBytes(cctx.String("car-size"))
if err != nil {
return xerrors.Errorf("parse --car-size: %w", err)
cars := 0
carb := &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
consume := func(c cid.Cid, b block.Block) error {
err = carb.consume(c, b)
switch err {
case nil:
case errFullCar:
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
if cars > 10 {
return xerrors.Errorf("enough")
carb = &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
go runtime.GC()
return xerrors.Errorf("carb consume: %w", err)
return nil
path := filepath.Join(lr.Path(), "datastore", "chain")
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false)
if err != nil {
return err
opts.Logger = &badgerLog{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
log.Infow("open db")
db, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore: %w", err)
defer db.Close() // nolint:errcheck
log.Infow("new stream")
var wlk sync.Mutex
str := db.NewStream()
str.NumGo = 16
str.LogPrefix = "bstream"
str.Send = func(list *pb.KVList) (err error) {
defer func() {
if err != nil {
log.Errorw("send error", "err", err)
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
if !strings.HasPrefix(string(kv.Key), "/blocks/") {
log.Infow("no blocks prefix", "key", string(kv.Key))
h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):]))
if err != nil {
return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err)
c := cid.NewCidV1(cid.Raw, h)
b, err := block.NewBlockWithCid(kv.Value, c)
if err != nil {
return xerrors.Errorf("readblk: %w", err)
err = consume(c, b)
if err != nil {
return xerrors.Errorf("consume stream block: %w", err)
return nil
if err := str.Orchestrate(ctx); err != nil {
return xerrors.Errorf("orchestrate stream: %w", err)
log.Infow("write last")
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
return nil
var errFullCar = errors.New("full")
const maxlinks = 16
type rawCarb struct {
max, cur uint64
nodes []*shedgen.CarbNode
blocks map[cid.Cid]block.Block
func (rc *rawCarb) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, has := rc.blocks[c]
return has, nil
func (rc *rawCarb) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
b, has := rc.blocks[c]
if !has {
return nil, ipld.ErrNotFound{Cid: c}
return b, nil
func (rc *rawCarb) GetSize(ctx context.Context, c cid.Cid) (int, error) {
b, has := rc.blocks[c]
if !has {
return 0, ipld.ErrNotFound{Cid: c}
return len(b.RawData()), nil
func (rc *rawCarb) checkNodes(maxl int) error {
if len(rc.nodes) == 0 {
log.Infow("add level", "l", 0)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
for i := 0; i < len(rc.nodes); i++ {
if len(rc.nodes[i].Sub) <= maxl {
if len(rc.nodes) <= i+1 {
log.Infow("add level", "l", i+1)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return err
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
if err != nil {
return xerrors.Errorf("gen cid: %w", err)
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return xerrors.Errorf("new block: %w", err)
if i > 1 {
log.Infow("compact", "from", i, "to", i+1, "sub", c.String())
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
return nil
func (rc *rawCarb) consume(c cid.Cid, b block.Block) error {
if err := rc.checkNodes(maxlinks); err != nil {
return err
if rc.cur+uint64(len(b.RawData())) > rc.max {
return errFullCar
rc.cur += uint64(len(b.RawData()))
b, err := block.NewBlockWithCid(b.RawData(), c)
if err != nil {
return xerrors.Errorf("create raw block: %w", err)
rc.blocks[c] = b
rc.nodes[0].Sub = append(rc.nodes[0].Sub, c)
return nil
func (rc *rawCarb) finalize() (cid.Cid, error) {
if len(rc.nodes) == 0 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
for i := 0; i < len(rc.nodes); i++ {
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return cid.Undef, err
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
if err != nil {
return cid.Undef, xerrors.Errorf("gen cid: %w", err)
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return cid.Undef, xerrors.Errorf("new block: %w", err)
log.Infow("fin", "level", i, "cid", c.String())
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
if len(rc.nodes[i].Sub) <= 1 && i == len(rc.nodes)-1 {
return c, err
if len(rc.nodes) <= i+1 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
return cid.Undef, xerrors.Errorf("failed to finalize")
func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) error {
f, err := os.Create(path)
if err != nil {
return xerrors.Errorf("create out car: %w", err)
bs := rc
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
log.Infow("write car", "path", path, "root", root.String(), "blocks", len(rc.blocks))
return car.WriteCar(ctx, ds, []cid.Cid{root}, f)
var _ blockstore.Blockstore = &rawCarb{}
type badgerLog struct {
skip2 *zap.SugaredLogger
func (b *badgerLog) Warningf(format string, args ...interface{}) {
b.skip2.Warnf(format, args...)