lotus/cmd/lotus-shed/export.go
Jorropo 6c01310728
chore: migrate to boxo
This migrates everything except the `go-car` librairy: https://github.com/ipfs/boxo/issues/218#issuecomment-1529922103

I didn't migrated everything in the previous release because all the boxo code wasn't compatible with the go-ipld-prime one due to a an in flight (/ aftermath) revert of github.com/ipfs/go-block-format. go-block-format has been unmigrated since slight bellow absolutely everything depends on it that would have required everything to be moved on boxo or everything to optin into using boxo which were all deal breakers for different groups.

This worked fine because lotus's codebase could live hapely on the first multirepo setup however boost is now trying to use boxo's code with lotus's (still on multirepo) setup: https://filecoinproject.slack.com/archives/C03AQ3QAUG1/p1685022344779649

The alternative would be for boost to write shim types which just forward calls and return with the different interface definitions.

Btw why is that an issue in the first place is because unlike what go's duck typing model suggest interfaces are not transparent https://github.com/golang/go/issues/58112, interfaces are strongly typed but they have implicit narrowing. The issue is if you return an interface from an interface Go does not have a function definition to insert the implicit conversion thus instead the type checker complains you are not returning the right type.

Stubbing types were reverted https://github.com/ipfs/boxo/issues/218#issuecomment-1478650351

Last time I only migrated `go-bitswap` to `boxo/bitswap` because of the security issues and because we never had the interface return an interface problem (we had concrete wrappers where the implicit conversion took place).
2023-06-19 14:45:05 -07:00

491 lines
11 KiB
Go

package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dustin/go-humanize"
"github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-car"
"github.com/multiformats/go-base32"
mh "github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/store"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
"github.com/filecoin-project/lotus/node/repo"
)
var exportChainCmd = &cli.Command{
Name: "export",
Description: "Export chain from repo (requires node to be offline)",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.StringFlag{
Name: "tipset",
Usage: "tipset to export from",
},
&cli.Int64Flag{
Name: "recent-stateroots",
},
&cli.BoolFlag{
Name: "full-state",
},
&cli.BoolFlag{
Name: "skip-old-msgs",
},
},
Subcommands: []*cli.Command{
exportRawCmd,
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
}
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
fi, err := os.Create(cctx.Args().First())
if err != nil {
return xerrors.Errorf("opening the output file: %w", err)
}
defer fi.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}
cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck
if err := cs.Load(context.Background()); err != nil {
return err
}
nroots := abi.ChainEpoch(cctx.Int64("recent-stateroots"))
fullstate := cctx.Bool("full-state")
skipoldmsgs := cctx.Bool("skip-old-msgs")
ts, err := lcli.ParseTipSetRefOffline(ctx, cs, cctx.String("tipset"))
if err != nil {
return err
}
if fullstate {
nroots = ts.Height() + 1
}
if err := cs.Export(ctx, ts, nroots, skipoldmsgs, fi); err != nil {
return xerrors.Errorf("export failed: %w", err)
}
return nil
},
}
var exportRawCmd = &cli.Command{
Name: "raw",
Description: "Export raw blocks from repo (requires node to be offline)",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.StringFlag{
Name: "car-size",
Value: "50M",
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
}
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.LockRO(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
out := cctx.Args().First()
err = os.Mkdir(out, 0755)
if err != nil {
return xerrors.Errorf("creating output dir: %w", err)
}
maxSz, err := humanize.ParseBytes(cctx.String("car-size"))
if err != nil {
return xerrors.Errorf("parse --car-size: %w", err)
}
cars := 0
carb := &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
{
consume := func(c cid.Cid, b block.Block) error {
err = carb.consume(c, b)
switch err {
case nil:
case errFullCar:
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
cars++
if cars > 10 {
return xerrors.Errorf("enough")
}
carb = &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
log.Infow("gc")
go runtime.GC()
default:
return xerrors.Errorf("carb consume: %w", err)
}
return nil
}
{
path := filepath.Join(lr.Path(), "datastore", "chain")
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false)
if err != nil {
return err
}
opts.Logger = &badgerLog{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
log.Infow("open db")
db, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore: %w", err)
}
defer db.Close() // nolint:errcheck
log.Infow("new stream")
var wlk sync.Mutex
str := db.NewStream()
str.NumGo = 16
str.LogPrefix = "bstream"
str.Send = func(list *pb.KVList) (err error) {
defer func() {
if err != nil {
log.Errorw("send error", "err", err)
}
}()
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
if !strings.HasPrefix(string(kv.Key), "/blocks/") {
log.Infow("no blocks prefix", "key", string(kv.Key))
continue
}
h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):]))
if err != nil {
return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err)
}
c := cid.NewCidV1(cid.Raw, h)
b, err := block.NewBlockWithCid(kv.Value, c)
if err != nil {
return xerrors.Errorf("readblk: %w", err)
}
wlk.Lock()
err = consume(c, b)
wlk.Unlock()
if err != nil {
return xerrors.Errorf("consume stream block: %w", err)
}
}
return nil
}
if err := str.Orchestrate(ctx); err != nil {
return xerrors.Errorf("orchestrate stream: %w", err)
}
}
}
log.Infow("write last")
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
return nil
},
}
var errFullCar = errors.New("full")
const maxlinks = 16
type rawCarb struct {
blockstore.Blockstore
max, cur uint64
nodes []*shedgen.CarbNode
blocks map[cid.Cid]block.Block
}
func (rc *rawCarb) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, has := rc.blocks[c]
return has, nil
}
func (rc *rawCarb) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
b, has := rc.blocks[c]
if !has {
return nil, ipld.ErrNotFound{Cid: c}
}
return b, nil
}
func (rc *rawCarb) GetSize(ctx context.Context, c cid.Cid) (int, error) {
b, has := rc.blocks[c]
if !has {
return 0, ipld.ErrNotFound{Cid: c}
}
return len(b.RawData()), nil
}
func (rc *rawCarb) checkNodes(maxl int) error {
if len(rc.nodes) == 0 {
log.Infow("add level", "l", 0)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
for i := 0; i < len(rc.nodes); i++ {
if len(rc.nodes[i].Sub) <= maxl {
break
}
if len(rc.nodes) <= i+1 {
log.Infow("add level", "l", i+1)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return err
}
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
}.Sum(bb.Bytes())
if err != nil {
return xerrors.Errorf("gen cid: %w", err)
}
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return xerrors.Errorf("new block: %w", err)
}
if i > 1 {
log.Infow("compact", "from", i, "to", i+1, "sub", c.String())
}
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
}
return nil
}
func (rc *rawCarb) consume(c cid.Cid, b block.Block) error {
if err := rc.checkNodes(maxlinks); err != nil {
return err
}
if rc.cur+uint64(len(b.RawData())) > rc.max {
return errFullCar
}
rc.cur += uint64(len(b.RawData()))
b, err := block.NewBlockWithCid(b.RawData(), c)
if err != nil {
return xerrors.Errorf("create raw block: %w", err)
}
rc.blocks[c] = b
rc.nodes[0].Sub = append(rc.nodes[0].Sub, c)
return nil
}
func (rc *rawCarb) finalize() (cid.Cid, error) {
if len(rc.nodes) == 0 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
for i := 0; i < len(rc.nodes); i++ {
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return cid.Undef, err
}
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
}.Sum(bb.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("gen cid: %w", err)
}
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return cid.Undef, xerrors.Errorf("new block: %w", err)
}
log.Infow("fin", "level", i, "cid", c.String())
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
if len(rc.nodes[i].Sub) <= 1 && i == len(rc.nodes)-1 {
return c, err
}
if len(rc.nodes) <= i+1 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
}
return cid.Undef, xerrors.Errorf("failed to finalize")
}
func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) error {
f, err := os.Create(path)
if err != nil {
return xerrors.Errorf("create out car: %w", err)
}
bs := rc
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
log.Infow("write car", "path", path, "root", root.String(), "blocks", len(rc.blocks))
return car.WriteCar(ctx, ds, []cid.Cid{root}, f)
}
var _ blockstore.Blockstore = &rawCarb{}
type badgerLog struct {
*zap.SugaredLogger
skip2 *zap.SugaredLogger
}
func (b *badgerLog) Warningf(format string, args ...interface{}) {
b.skip2.Warnf(format, args...)
}