From 84dbb229b65a7da17703f4ec6544eacbcd77d61b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 8 Mar 2022 22:48:08 +0100 Subject: [PATCH 1/2] shed: blockstore/vlog to car export cmds --- chain/store/snapshot.go | 6 +- cmd/lotus-shed/datastore-vlog.go | 350 ++++++++++++++++++++++++++ cmd/lotus-shed/datastore.go | 1 + cmd/lotus-shed/export.go | 374 +++++++++++++++++++++++++++- cmd/lotus-shed/shedgen/cbor_gen.go | 128 ++++++++++ cmd/lotus-shed/shedgen/rawexport.go | 7 + gen/main.go | 8 + 7 files changed, 871 insertions(+), 3 deletions(-) create mode 100644 cmd/lotus-shed/datastore-vlog.go create mode 100644 cmd/lotus-shed/shedgen/cbor_gen.go create mode 100644 cmd/lotus-shed/shedgen/rawexport.go diff --git a/chain/store/snapshot.go b/chain/store/snapshot.go index 61fa8bdc8..b9630bcbd 100644 --- a/chain/store/snapshot.go +++ b/chain/store/snapshot.go @@ -18,6 +18,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +func (cs *ChainStore) UnionStore() bstore.Blockstore { + return bstore.Union(cs.stateBlockstore, cs.chainBlockstore) +} + func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error { h := &car.CarHeader{ Roots: ts.Cids(), @@ -28,7 +32,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } - unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore) + unionBs := cs.UnionStore() return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error { blk, err := unionBs.Get(ctx, c) if err != nil { diff --git a/cmd/lotus-shed/datastore-vlog.go b/cmd/lotus-shed/datastore-vlog.go new file mode 100644 index 000000000..4a848b24f --- /dev/null +++ b/cmd/lotus-shed/datastore-vlog.go @@ -0,0 +1,350 @@ +package main + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + "os" + "strings" + + "github.com/dgraph-io/badger/v2/y" + block "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-base32" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" +) + +var datastoreVlog2CarCmd = &cli.Command{ + Name: "vlog2car", + Usage: "convert badger blockstore .vlog to .car", + Flags: []cli.Flag{ + &cli.PathFlag{ + Name: "vlog", + Usage: "vlog file", + Required: true, + }, + &cli.PathFlag{ + Name: "car", + Usage: "out car file name (no .car)", + Required: true, + }, + &cli.StringFlag{ + Name: "key-prefix", + Usage: "datastore prefix", + Value: "/blocks/", + }, + }, + Action: func(cctx *cli.Context) error { + ctx := cctx.Context + + maxSz := uint64(1 << 20) + + carb := &rawCarb{ + max: maxSz, + blocks: map[cid.Cid]block.Block{}, + } + cars := 0 + + pref := cctx.String("key-prefix") + plen := len(pref) + + { + // NOTE: Some bits of code in this code block come from https://github.com/dgraph-io/badger, which is licensed + // under Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE + + vf, err := os.Open(cctx.Path("vlog")) + if err != nil { + return xerrors.Errorf("open vlog file: %w", err) + } + + if _, err := vf.Seek(20, io.SeekStart); err != nil { + return xerrors.Errorf("seek past vlog start: %w", err) + } + + reader := bufio.NewReader(vf) + read := &safeRead{ + k: make([]byte, 10), + v: make([]byte, 10), + recordOffset: 20, + } + + loop: + for { + e, err := read.Entry(reader) + switch { + case err == io.EOF: + break loop + case err == io.ErrUnexpectedEOF || err == errTruncate: + break loop + case err != nil: + return xerrors.Errorf("entry read error: %w", err) + case e == nil: + continue + } + + if e.meta&0x40 > 0 { + e.Key = e.Key[:len(e.Key)-8] + } else if e.meta > 0 { + if e.meta&0x3f > 0 { + log.Infof("unk meta m:%x; k:%x, v:%60x", e.meta, e.Key, e.Value) + } + continue + } + + { + if plen > 0 && !strings.HasPrefix(string(e.Key), pref) { + log.Infow("no blocks prefix", "key", string(e.Key)) + continue + } + + h, err := base32.RawStdEncoding.DecodeString(string(e.Key[plen:])) + if err != nil { + return xerrors.Errorf("decode b32 ds key %x: %w", e.Key, err) + } + + c := cid.NewCidV1(cid.Raw, h) + + b, err := block.NewBlockWithCid(e.Value, c) + if err != nil { + return xerrors.Errorf("readblk: %w", err) + } + + err = carb.consume(c, b) + switch err { + case nil: + case fullCar: + root, err := carb.finalize() + if err != nil { + return xerrors.Errorf("carb finalize: %w", err) + } + + if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil { + return xerrors.Errorf("writeCar: %w", err) + } + + cars++ + + carb = &rawCarb{ + max: maxSz, + blocks: map[cid.Cid]block.Block{}, + } + + default: + return xerrors.Errorf("carb consume: %w", err) + } + } + } + + if err := vf.Close(); err != nil { + return err + } + } + + root, err := carb.finalize() + if err != nil { + return xerrors.Errorf("carb finalize: %w", err) + } + + if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil { + return xerrors.Errorf("writeCar: %w", err) + } + + return nil + + }, +} + +// NOTE: Code below comes (with slight modifications) from https://github.com/dgraph-io/badger/blob/master/value.go +// Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE + +var errTruncate = errors.New("do truncate") + +// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number +// bytes read. The hashReader writes to h (hash) what it reads from r. +type hashReader struct { + r io.Reader + h hash.Hash32 + bytesRead int // Number of bytes read. +} + +func newHashReader(r io.Reader) *hashReader { + hash := crc32.New(y.CastagnoliCrcTable) + return &hashReader{ + r: r, + h: hash, + } +} + +// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure. +func (t *hashReader) Read(p []byte) (int, error) { + n, err := t.r.Read(p) + if err != nil { + return n, err + } + t.bytesRead += n + return t.h.Write(p[:n]) +} + +// ReadByte reads exactly one byte from the reader. Returns error on failure. +func (t *hashReader) ReadByte() (byte, error) { + b := make([]byte, 1) + _, err := t.Read(b) + return b[0], err +} + +// Sum32 returns the sum32 of the underlying hash. +func (t *hashReader) Sum32() uint32 { + return t.h.Sum32() +} + +type safeRead struct { + k []byte + v []byte + + recordOffset uint32 +} + +// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by +// the user to set data. +type Entry struct { + Key []byte + Value []byte + UserMeta byte + ExpiresAt uint64 // time.Unix + meta byte + version uint64 + + // Fields maintained internally. + offset uint32 + skipVlog bool + hlen int // Length of the header. +} + +// Entry reads an entry from the provided reader. It also validates the checksum for every entry +// read. Returns error on failure. +func (r *safeRead) Entry(reader io.Reader) (*Entry, error) { + tee := newHashReader(reader) + var h header + hlen, err := h.DecodeFrom(tee) + if err != nil { + return nil, err + } + if h.klen > uint32(1<<16) { // Key length must be below uint16. + return nil, errTruncate + } + kl := int(h.klen) + if cap(r.k) < kl { + r.k = make([]byte, 2*kl) + } + vl := int(h.vlen) + if cap(r.v) < vl { + r.v = make([]byte, 2*vl) + } + + e := &Entry{} + e.offset = r.recordOffset + e.hlen = hlen + buf := make([]byte, h.klen+h.vlen) + if _, err := io.ReadFull(tee, buf[:]); err != nil { + if err == io.EOF { + err = errTruncate + } + return nil, err + } + e.Key = buf[:h.klen] + e.Value = buf[h.klen:] + var crcBuf [crc32.Size]byte + if _, err := io.ReadFull(reader, crcBuf[:]); err != nil { + if err == io.EOF { + err = errTruncate + } + return nil, err + } + crc := y.BytesToU32(crcBuf[:]) + if crc != tee.Sum32() { + return nil, errTruncate + } + e.meta = h.meta + e.UserMeta = h.userMeta + e.ExpiresAt = h.expiresAt + return e, nil +} + +// header is used in value log as a header before Entry. +type header struct { + klen uint32 + vlen uint32 + expiresAt uint64 + meta byte + userMeta byte +} + +const ( + // Maximum possible size of the header. The maximum size of header struct will be 18 but the + // maximum size of varint encoded header will be 21. + maxHeaderSize = 21 +) + +// Encode encodes the header into []byte. The provided []byte should be atleast 5 bytes. The +// function will panic if out []byte isn't large enough to hold all the values. +// The encoded header looks like +// +------+----------+------------+--------------+-----------+ +// | Meta | UserMeta | Key Length | Value Length | ExpiresAt | +// +------+----------+------------+--------------+-----------+ +func (h header) Encode(out []byte) int { + out[0], out[1] = h.meta, h.userMeta + index := 2 + index += binary.PutUvarint(out[index:], uint64(h.klen)) + index += binary.PutUvarint(out[index:], uint64(h.vlen)) + index += binary.PutUvarint(out[index:], h.expiresAt) + return index +} + +// Decode decodes the given header from the provided byte slice. +// Returns the number of bytes read. +func (h *header) Decode(buf []byte) int { + h.meta, h.userMeta = buf[0], buf[1] + index := 2 + klen, count := binary.Uvarint(buf[index:]) + h.klen = uint32(klen) + index += count + vlen, count := binary.Uvarint(buf[index:]) + h.vlen = uint32(vlen) + index += count + h.expiresAt, count = binary.Uvarint(buf[index:]) + return index + count +} + +// DecodeFrom reads the header from the hashReader. +// Returns the number of bytes read. +func (h *header) DecodeFrom(reader *hashReader) (int, error) { + var err error + h.meta, err = reader.ReadByte() + if err != nil { + return 0, err + } + h.userMeta, err = reader.ReadByte() + if err != nil { + return 0, err + } + klen, err := binary.ReadUvarint(reader) + if err != nil { + return 0, err + } + h.klen = uint32(klen) + vlen, err := binary.ReadUvarint(reader) + if err != nil { + return 0, err + } + h.vlen = uint32(vlen) + h.expiresAt, err = binary.ReadUvarint(reader) + if err != nil { + return 0, err + } + return reader.bytesRead, nil +} diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index ff740a772..698e63324 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -32,6 +32,7 @@ var datastoreCmd = &cli.Command{ datastoreListCmd, datastoreGetCmd, datastoreRewriteCmd, + datastoreVlog2CarCmd, }, } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 3851e4922..9583c0ac5 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -1,19 +1,38 @@ package main import ( + "bytes" "context" + "errors" "fmt" "io" "os" + "path/filepath" + "runtime" + "strings" + "sync" + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dustin/go-humanize" + "github.com/filecoin-project/go-state-types/abi" + block "github.com/ipfs/go-block-format" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + offline "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-merkledag" + "github.com/ipld/go-car" + "github.com/multiformats/go-base32" + mh "github.com/multiformats/go-multihash" "github.com/urfave/cli/v2" + "go.uber.org/zap" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/abi" - + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen" "github.com/filecoin-project/lotus/node/repo" ) @@ -39,6 +58,9 @@ var exportChainCmd = &cli.Command{ Name: "skip-old-msgs", }, }, + Subcommands: []*cli.Command{ + exportRawCmd, + }, Action: func(cctx *cli.Context) error { if !cctx.Args().Present() { return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to")) @@ -130,3 +152,351 @@ var exportChainCmd = &cli.Command{ return nil }, } + +var exportRawCmd = &cli.Command{ + Name: "raw", + Description: "Export raw blocks from repo (requires node to be offline)", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + }, + &cli.StringFlag{ + Name: "car-size", + Value: "50M", + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to")) + } + + ctx := context.TODO() + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.LockRO(repo.FullNode) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + out := cctx.Args().First() + err = os.Mkdir(out, 0755) + if err != nil { + return xerrors.Errorf("creating output dir: %w", err) + } + + maxSz, err := humanize.ParseBytes(cctx.String("car-size")) + if err != nil { + return xerrors.Errorf("parse --car-size: %w", err) + } + + cars := 0 + + carb := &rawCarb{ + max: maxSz, + blocks: map[cid.Cid]block.Block{}, + } + + { + consume := func(c cid.Cid, b block.Block) error { + err = carb.consume(c, b) + switch err { + case nil: + case fullCar: + root, err := carb.finalize() + if err != nil { + return xerrors.Errorf("carb finalize: %w", err) + } + + if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil { + return xerrors.Errorf("writeCar: %w", err) + } + + cars++ + + if cars > 10 { + return xerrors.Errorf("enough") + } + + carb = &rawCarb{ + max: maxSz, + blocks: map[cid.Cid]block.Block{}, + } + + log.Infow("gc") + go runtime.GC() + + default: + return xerrors.Errorf("carb consume: %w", err) + } + return nil + } + + { + path := filepath.Join(lr.Path(), "datastore", "chain") + opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false) + if err != nil { + return err + } + + opts.Logger = &badgerLog{ + SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), + skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + } + + log.Infow("open db") + + db, err := badger.Open(opts.Options) + if err != nil { + return fmt.Errorf("failed to open badger blockstore: %w", err) + } + defer db.Close() + + log.Infow("new stream") + + var wlk sync.Mutex + + str := db.NewStream() + str.NumGo = 16 + str.LogPrefix = "bstream" + str.Send = func(list *pb.KVList) (err error) { + defer func() { + if err != nil { + log.Errorw("send error", "err", err) + } + }() + + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + if !strings.HasPrefix(string(kv.Key), "/blocks/") { + log.Infow("no blocks prefix", "key", string(kv.Key)) + continue + } + + h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):])) + if err != nil { + return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err) + } + + c := cid.NewCidV1(cid.Raw, h) + + b, err := block.NewBlockWithCid(kv.Value, c) + if err != nil { + return xerrors.Errorf("readblk: %w", err) + } + + wlk.Lock() + err = consume(c, b) + wlk.Unlock() + if err != nil { + return xerrors.Errorf("consume stream block: %w", err) + } + } + + return nil + } + + if err := str.Orchestrate(ctx); err != nil { + return xerrors.Errorf("orchestrate stream: %w", err) + } + } + } + + log.Infow("write last") + + root, err := carb.finalize() + if err != nil { + return xerrors.Errorf("carb finalize: %w", err) + } + + if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil { + return xerrors.Errorf("writeCar: %w", err) + } + + return nil + }, +} + +var fullCar = errors.New("full") + +const maxlinks = 16 + +type rawCarb struct { + blockstore.Blockstore + + max, cur uint64 + + nodes []*shedgen.CarbNode + + blocks map[cid.Cid]block.Block +} + +func (rc *rawCarb) Has(ctx context.Context, c cid.Cid) (bool, error) { + _, has := rc.blocks[c] + return has, nil +} + +func (rc *rawCarb) Get(ctx context.Context, c cid.Cid) (block.Block, error) { + b, has := rc.blocks[c] + if !has { + return nil, blockstore.ErrNotFound + } + return b, nil +} + +func (rc *rawCarb) GetSize(ctx context.Context, c cid.Cid) (int, error) { + b, has := rc.blocks[c] + if !has { + return 0, blockstore.ErrNotFound + } + return len(b.RawData()), nil +} + +func (rc *rawCarb) checkNodes(maxl int) error { + if len(rc.nodes) == 0 { + log.Infow("add level", "l", 0) + rc.nodes = append(rc.nodes, new(shedgen.CarbNode)) + } + for i := 0; i < len(rc.nodes); i++ { + if len(rc.nodes[i].Sub) <= maxl { + break + } + if len(rc.nodes) <= i+1 { + log.Infow("add level", "l", i+1) + rc.nodes = append(rc.nodes, new(shedgen.CarbNode)) + } + + var bb bytes.Buffer + if err := rc.nodes[i].MarshalCBOR(&bb); err != nil { + return err + } + c, err := cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: mh.SHA2_256, + MhLength: -1, + }.Sum(bb.Bytes()) + if err != nil { + return xerrors.Errorf("gen cid: %w", err) + } + + b, err := block.NewBlockWithCid(bb.Bytes(), c) + if err != nil { + return xerrors.Errorf("new block: %w", err) + } + + if i > 1 { + log.Infow("compact", "from", i, "to", i+1, "sub", c.String()) + } + + rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c) + rc.blocks[c] = b + rc.nodes[i] = new(shedgen.CarbNode) + rc.cur += uint64(bb.Len()) + } + + return nil +} + +func (rc *rawCarb) consume(c cid.Cid, b block.Block) error { + if err := rc.checkNodes(maxlinks); err != nil { + return err + } + if rc.cur+uint64(len(b.RawData())) > rc.max { + return fullCar + } + + rc.cur += uint64(len(b.RawData())) + + b, err := block.NewBlockWithCid(b.RawData(), c) + if err != nil { + return xerrors.Errorf("create raw block: %w", err) + } + + rc.blocks[c] = b + rc.nodes[0].Sub = append(rc.nodes[0].Sub, c) + + return nil +} + +func (rc *rawCarb) finalize() (cid.Cid, error) { + if len(rc.nodes) == 0 { + rc.nodes = append(rc.nodes, new(shedgen.CarbNode)) + } + + for i := 0; i < len(rc.nodes); i++ { + var bb bytes.Buffer + if err := rc.nodes[i].MarshalCBOR(&bb); err != nil { + return cid.Undef, err + } + c, err := cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: mh.SHA2_256, + MhLength: -1, + }.Sum(bb.Bytes()) + if err != nil { + return cid.Undef, xerrors.Errorf("gen cid: %w", err) + } + + b, err := block.NewBlockWithCid(bb.Bytes(), c) + if err != nil { + return cid.Undef, xerrors.Errorf("new block: %w", err) + } + + log.Infow("fin", "level", i, "cid", c.String()) + + rc.blocks[c] = b + rc.nodes[i] = new(shedgen.CarbNode) + rc.cur += uint64(bb.Len()) + + if len(rc.nodes[i].Sub) <= 1 && i == len(rc.nodes)-1 { + return c, err + } + if len(rc.nodes) <= i+1 { + rc.nodes = append(rc.nodes, new(shedgen.CarbNode)) + } + rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c) + } + return cid.Undef, xerrors.Errorf("failed to finalize") +} + +func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) error { + f, err := os.Create(path) + if err != nil { + return xerrors.Errorf("create out car: %w", err) + } + + bs := rc + ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + + log.Infow("write car", "path", path, "root", root.String(), "blocks", len(rc.blocks)) + + return car.WriteCar(ctx, ds, []cid.Cid{root}, f) +} + +var _ blockstore.Blockstore = &rawCarb{} + +type badgerLog struct { + *zap.SugaredLogger + skip2 *zap.SugaredLogger +} + +func (b *badgerLog) Warningf(format string, args ...interface{}) { + b.skip2.Warnf(format, args...) +} diff --git a/cmd/lotus-shed/shedgen/cbor_gen.go b/cmd/lotus-shed/shedgen/cbor_gen.go new file mode 100644 index 000000000..37ed95539 --- /dev/null +++ b/cmd/lotus-shed/shedgen/cbor_gen.go @@ -0,0 +1,128 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package shedgen + +import ( + "fmt" + "io" + "math" + "sort" + + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +func (t *CarbNode) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{161}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Sub ([]cid.Cid) (slice) + if len("Sub") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Sub\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Sub"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Sub")); err != nil { + return err + } + + if len(t.Sub) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Sub was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Sub))); err != nil { + return err + } + for _, v := range t.Sub { + if err := cbg.WriteCidBuf(scratch, w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.Sub: %w", err) + } + } + return nil +} + +func (t *CarbNode) UnmarshalCBOR(r io.Reader) error { + *t = CarbNode{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("CarbNode: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.Sub ([]cid.Cid) (slice) + case "Sub": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Sub: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Sub = make([]cid.Cid, extra) + } + + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("reading cid field t.Sub failed: %w", err) + } + t.Sub[i] = c + } + + default: + // Field doesn't exist on this type, so ignore it + cbg.ScanForLinks(r, func(cid.Cid) {}) + } + } + + return nil +} diff --git a/cmd/lotus-shed/shedgen/rawexport.go b/cmd/lotus-shed/shedgen/rawexport.go new file mode 100644 index 000000000..ca430c5e6 --- /dev/null +++ b/cmd/lotus-shed/shedgen/rawexport.go @@ -0,0 +1,7 @@ +package shedgen + +import "github.com/ipfs/go-cid" + +type CarbNode struct { + Sub []cid.Cid +} diff --git a/gen/main.go b/gen/main.go index 0018b241d..f7b96c537 100644 --- a/gen/main.go +++ b/gen/main.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/node/hello" @@ -106,4 +107,11 @@ func main() { fmt.Println(err) os.Exit(1) } + err = gen.WriteMapEncodersToFile("./cmd/lotus-shed/shedgen/cbor_gen.go", "shedgen", + shedgen.CarbNode{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } From ffabb015ef8b2a79504d2138ecb7f337aae7efa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 9 Mar 2022 10:26:02 +0100 Subject: [PATCH 2/2] shed: fix lint --- cmd/lotus-shed/datastore-vlog.go | 14 +++----------- cmd/lotus-shed/export.go | 8 ++++---- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/cmd/lotus-shed/datastore-vlog.go b/cmd/lotus-shed/datastore-vlog.go index 4a848b24f..7f0c708a5 100644 --- a/cmd/lotus-shed/datastore-vlog.go +++ b/cmd/lotus-shed/datastore-vlog.go @@ -117,7 +117,7 @@ var datastoreVlog2CarCmd = &cli.Command{ err = carb.consume(c, b) switch err { case nil: - case fullCar: + case errFullCar: root, err := carb.finalize() if err != nil { return xerrors.Errorf("carb finalize: %w", err) @@ -217,12 +217,10 @@ type Entry struct { UserMeta byte ExpiresAt uint64 // time.Unix meta byte - version uint64 // Fields maintained internally. - offset uint32 - skipVlog bool - hlen int // Length of the header. + offset uint32 + hlen int // Length of the header. } // Entry reads an entry from the provided reader. It also validates the checksum for every entry @@ -284,12 +282,6 @@ type header struct { userMeta byte } -const ( - // Maximum possible size of the header. The maximum size of header struct will be 18 but the - // maximum size of varint encoded header will be 21. - maxHeaderSize = 21 -) - // Encode encodes the header into []byte. The provided []byte should be atleast 5 bytes. The // function will panic if out []byte isn't large enough to hold all the values. // The encoded header looks like diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 9583c0ac5..fec4d575a 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -215,7 +215,7 @@ var exportRawCmd = &cli.Command{ err = carb.consume(c, b) switch err { case nil: - case fullCar: + case errFullCar: root, err := carb.finalize() if err != nil { return xerrors.Errorf("carb finalize: %w", err) @@ -263,7 +263,7 @@ var exportRawCmd = &cli.Command{ if err != nil { return fmt.Errorf("failed to open badger blockstore: %w", err) } - defer db.Close() + defer db.Close() // nolint:errcheck log.Infow("new stream") @@ -332,7 +332,7 @@ var exportRawCmd = &cli.Command{ }, } -var fullCar = errors.New("full") +var errFullCar = errors.New("full") const maxlinks = 16 @@ -418,7 +418,7 @@ func (rc *rawCarb) consume(c cid.Cid, b block.Block) error { return err } if rc.cur+uint64(len(b.RawData())) > rc.max { - return fullCar + return errFullCar } rc.cur += uint64(len(b.RawData()))