shed: blockstore/vlog to car export cmds
This commit is contained in:
parent
5416ce5443
commit
84dbb229b6
@ -18,6 +18,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (cs *ChainStore) UnionStore() bstore.Blockstore {
|
||||||
|
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
|
||||||
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
|
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
|
||||||
h := &car.CarHeader{
|
h := &car.CarHeader{
|
||||||
Roots: ts.Cids(),
|
Roots: ts.Cids(),
|
||||||
@ -28,7 +32,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
|||||||
return xerrors.Errorf("failed to write car header: %s", err)
|
return xerrors.Errorf("failed to write car header: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
|
unionBs := cs.UnionStore()
|
||||||
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
|
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
|
||||||
blk, err := unionBs.Get(ctx, c)
|
blk, err := unionBs.Get(ctx, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
350
cmd/lotus-shed/datastore-vlog.go
Normal file
350
cmd/lotus-shed/datastore-vlog.go
Normal file
@ -0,0 +1,350 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"hash/crc32"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/dgraph-io/badger/v2/y"
|
||||||
|
block "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/multiformats/go-base32"
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var datastoreVlog2CarCmd = &cli.Command{
|
||||||
|
Name: "vlog2car",
|
||||||
|
Usage: "convert badger blockstore .vlog to .car",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.PathFlag{
|
||||||
|
Name: "vlog",
|
||||||
|
Usage: "vlog file",
|
||||||
|
Required: true,
|
||||||
|
},
|
||||||
|
&cli.PathFlag{
|
||||||
|
Name: "car",
|
||||||
|
Usage: "out car file name (no .car)",
|
||||||
|
Required: true,
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "key-prefix",
|
||||||
|
Usage: "datastore prefix",
|
||||||
|
Value: "/blocks/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
ctx := cctx.Context
|
||||||
|
|
||||||
|
maxSz := uint64(1 << 20)
|
||||||
|
|
||||||
|
carb := &rawCarb{
|
||||||
|
max: maxSz,
|
||||||
|
blocks: map[cid.Cid]block.Block{},
|
||||||
|
}
|
||||||
|
cars := 0
|
||||||
|
|
||||||
|
pref := cctx.String("key-prefix")
|
||||||
|
plen := len(pref)
|
||||||
|
|
||||||
|
{
|
||||||
|
// NOTE: Some bits of code in this code block come from https://github.com/dgraph-io/badger, which is licensed
|
||||||
|
// under Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE
|
||||||
|
|
||||||
|
vf, err := os.Open(cctx.Path("vlog"))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("open vlog file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := vf.Seek(20, io.SeekStart); err != nil {
|
||||||
|
return xerrors.Errorf("seek past vlog start: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := bufio.NewReader(vf)
|
||||||
|
read := &safeRead{
|
||||||
|
k: make([]byte, 10),
|
||||||
|
v: make([]byte, 10),
|
||||||
|
recordOffset: 20,
|
||||||
|
}
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
e, err := read.Entry(reader)
|
||||||
|
switch {
|
||||||
|
case err == io.EOF:
|
||||||
|
break loop
|
||||||
|
case err == io.ErrUnexpectedEOF || err == errTruncate:
|
||||||
|
break loop
|
||||||
|
case err != nil:
|
||||||
|
return xerrors.Errorf("entry read error: %w", err)
|
||||||
|
case e == nil:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.meta&0x40 > 0 {
|
||||||
|
e.Key = e.Key[:len(e.Key)-8]
|
||||||
|
} else if e.meta > 0 {
|
||||||
|
if e.meta&0x3f > 0 {
|
||||||
|
log.Infof("unk meta m:%x; k:%x, v:%60x", e.meta, e.Key, e.Value)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
if plen > 0 && !strings.HasPrefix(string(e.Key), pref) {
|
||||||
|
log.Infow("no blocks prefix", "key", string(e.Key))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
h, err := base32.RawStdEncoding.DecodeString(string(e.Key[plen:]))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("decode b32 ds key %x: %w", e.Key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := cid.NewCidV1(cid.Raw, h)
|
||||||
|
|
||||||
|
b, err := block.NewBlockWithCid(e.Value, c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("readblk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = carb.consume(c, b)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
case fullCar:
|
||||||
|
root, err := carb.finalize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("carb finalize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil {
|
||||||
|
return xerrors.Errorf("writeCar: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cars++
|
||||||
|
|
||||||
|
carb = &rawCarb{
|
||||||
|
max: maxSz,
|
||||||
|
blocks: map[cid.Cid]block.Block{},
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("carb consume: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := vf.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
root, err := carb.finalize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("carb finalize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil {
|
||||||
|
return xerrors.Errorf("writeCar: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Code below comes (with slight modifications) from https://github.com/dgraph-io/badger/blob/master/value.go
|
||||||
|
// Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE
|
||||||
|
|
||||||
|
var errTruncate = errors.New("do truncate")
|
||||||
|
|
||||||
|
// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
|
||||||
|
// bytes read. The hashReader writes to h (hash) what it reads from r.
|
||||||
|
type hashReader struct {
|
||||||
|
r io.Reader
|
||||||
|
h hash.Hash32
|
||||||
|
bytesRead int // Number of bytes read.
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHashReader(r io.Reader) *hashReader {
|
||||||
|
hash := crc32.New(y.CastagnoliCrcTable)
|
||||||
|
return &hashReader{
|
||||||
|
r: r,
|
||||||
|
h: hash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
|
||||||
|
func (t *hashReader) Read(p []byte) (int, error) {
|
||||||
|
n, err := t.r.Read(p)
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
t.bytesRead += n
|
||||||
|
return t.h.Write(p[:n])
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadByte reads exactly one byte from the reader. Returns error on failure.
|
||||||
|
func (t *hashReader) ReadByte() (byte, error) {
|
||||||
|
b := make([]byte, 1)
|
||||||
|
_, err := t.Read(b)
|
||||||
|
return b[0], err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sum32 returns the sum32 of the underlying hash.
|
||||||
|
func (t *hashReader) Sum32() uint32 {
|
||||||
|
return t.h.Sum32()
|
||||||
|
}
|
||||||
|
|
||||||
|
type safeRead struct {
|
||||||
|
k []byte
|
||||||
|
v []byte
|
||||||
|
|
||||||
|
recordOffset uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by
|
||||||
|
// the user to set data.
|
||||||
|
type Entry struct {
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
UserMeta byte
|
||||||
|
ExpiresAt uint64 // time.Unix
|
||||||
|
meta byte
|
||||||
|
version uint64
|
||||||
|
|
||||||
|
// Fields maintained internally.
|
||||||
|
offset uint32
|
||||||
|
skipVlog bool
|
||||||
|
hlen int // Length of the header.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry reads an entry from the provided reader. It also validates the checksum for every entry
|
||||||
|
// read. Returns error on failure.
|
||||||
|
func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
|
||||||
|
tee := newHashReader(reader)
|
||||||
|
var h header
|
||||||
|
hlen, err := h.DecodeFrom(tee)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if h.klen > uint32(1<<16) { // Key length must be below uint16.
|
||||||
|
return nil, errTruncate
|
||||||
|
}
|
||||||
|
kl := int(h.klen)
|
||||||
|
if cap(r.k) < kl {
|
||||||
|
r.k = make([]byte, 2*kl)
|
||||||
|
}
|
||||||
|
vl := int(h.vlen)
|
||||||
|
if cap(r.v) < vl {
|
||||||
|
r.v = make([]byte, 2*vl)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &Entry{}
|
||||||
|
e.offset = r.recordOffset
|
||||||
|
e.hlen = hlen
|
||||||
|
buf := make([]byte, h.klen+h.vlen)
|
||||||
|
if _, err := io.ReadFull(tee, buf[:]); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
err = errTruncate
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
e.Key = buf[:h.klen]
|
||||||
|
e.Value = buf[h.klen:]
|
||||||
|
var crcBuf [crc32.Size]byte
|
||||||
|
if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
err = errTruncate
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
crc := y.BytesToU32(crcBuf[:])
|
||||||
|
if crc != tee.Sum32() {
|
||||||
|
return nil, errTruncate
|
||||||
|
}
|
||||||
|
e.meta = h.meta
|
||||||
|
e.UserMeta = h.userMeta
|
||||||
|
e.ExpiresAt = h.expiresAt
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// header is used in value log as a header before Entry.
|
||||||
|
type header struct {
|
||||||
|
klen uint32
|
||||||
|
vlen uint32
|
||||||
|
expiresAt uint64
|
||||||
|
meta byte
|
||||||
|
userMeta byte
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Maximum possible size of the header. The maximum size of header struct will be 18 but the
|
||||||
|
// maximum size of varint encoded header will be 21.
|
||||||
|
maxHeaderSize = 21
|
||||||
|
)
|
||||||
|
|
||||||
|
// Encode encodes the header into []byte. The provided []byte should be atleast 5 bytes. The
|
||||||
|
// function will panic if out []byte isn't large enough to hold all the values.
|
||||||
|
// The encoded header looks like
|
||||||
|
// +------+----------+------------+--------------+-----------+
|
||||||
|
// | Meta | UserMeta | Key Length | Value Length | ExpiresAt |
|
||||||
|
// +------+----------+------------+--------------+-----------+
|
||||||
|
func (h header) Encode(out []byte) int {
|
||||||
|
out[0], out[1] = h.meta, h.userMeta
|
||||||
|
index := 2
|
||||||
|
index += binary.PutUvarint(out[index:], uint64(h.klen))
|
||||||
|
index += binary.PutUvarint(out[index:], uint64(h.vlen))
|
||||||
|
index += binary.PutUvarint(out[index:], h.expiresAt)
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode decodes the given header from the provided byte slice.
|
||||||
|
// Returns the number of bytes read.
|
||||||
|
func (h *header) Decode(buf []byte) int {
|
||||||
|
h.meta, h.userMeta = buf[0], buf[1]
|
||||||
|
index := 2
|
||||||
|
klen, count := binary.Uvarint(buf[index:])
|
||||||
|
h.klen = uint32(klen)
|
||||||
|
index += count
|
||||||
|
vlen, count := binary.Uvarint(buf[index:])
|
||||||
|
h.vlen = uint32(vlen)
|
||||||
|
index += count
|
||||||
|
h.expiresAt, count = binary.Uvarint(buf[index:])
|
||||||
|
return index + count
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecodeFrom reads the header from the hashReader.
|
||||||
|
// Returns the number of bytes read.
|
||||||
|
func (h *header) DecodeFrom(reader *hashReader) (int, error) {
|
||||||
|
var err error
|
||||||
|
h.meta, err = reader.ReadByte()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
h.userMeta, err = reader.ReadByte()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
klen, err := binary.ReadUvarint(reader)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
h.klen = uint32(klen)
|
||||||
|
vlen, err := binary.ReadUvarint(reader)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
h.vlen = uint32(vlen)
|
||||||
|
h.expiresAt, err = binary.ReadUvarint(reader)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return reader.bytesRead, nil
|
||||||
|
}
|
@ -32,6 +32,7 @@ var datastoreCmd = &cli.Command{
|
|||||||
datastoreListCmd,
|
datastoreListCmd,
|
||||||
datastoreGetCmd,
|
datastoreGetCmd,
|
||||||
datastoreRewriteCmd,
|
datastoreRewriteCmd,
|
||||||
|
datastoreVlog2CarCmd,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,19 +1,38 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/dgraph-io/badger/v2"
|
||||||
|
"github.com/dgraph-io/badger/v2/pb"
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
block "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-blockservice"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||||
|
"github.com/ipfs/go-merkledag"
|
||||||
|
"github.com/ipld/go-car"
|
||||||
|
"github.com/multiformats/go-base32"
|
||||||
|
mh "github.com/multiformats/go-multihash"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
"go.uber.org/zap"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/lotus/blockstore"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,6 +58,9 @@ var exportChainCmd = &cli.Command{
|
|||||||
Name: "skip-old-msgs",
|
Name: "skip-old-msgs",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Subcommands: []*cli.Command{
|
||||||
|
exportRawCmd,
|
||||||
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
if !cctx.Args().Present() {
|
if !cctx.Args().Present() {
|
||||||
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
|
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
|
||||||
@ -130,3 +152,351 @@ var exportChainCmd = &cli.Command{
|
|||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var exportRawCmd = &cli.Command{
|
||||||
|
Name: "raw",
|
||||||
|
Description: "Export raw blocks from repo (requires node to be offline)",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "repo",
|
||||||
|
Value: "~/.lotus",
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "car-size",
|
||||||
|
Value: "50M",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
if !cctx.Args().Present() {
|
||||||
|
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
r, err := repo.NewFS(cctx.String("repo"))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("opening fs repo: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exists, err := r.Exists()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return xerrors.Errorf("lotus repo doesn't exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
lr, err := r.LockRO(repo.FullNode)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer lr.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
out := cctx.Args().First()
|
||||||
|
err = os.Mkdir(out, 0755)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("creating output dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
maxSz, err := humanize.ParseBytes(cctx.String("car-size"))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("parse --car-size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cars := 0
|
||||||
|
|
||||||
|
carb := &rawCarb{
|
||||||
|
max: maxSz,
|
||||||
|
blocks: map[cid.Cid]block.Block{},
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
consume := func(c cid.Cid, b block.Block) error {
|
||||||
|
err = carb.consume(c, b)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
case fullCar:
|
||||||
|
root, err := carb.finalize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("carb finalize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
|
||||||
|
return xerrors.Errorf("writeCar: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cars++
|
||||||
|
|
||||||
|
if cars > 10 {
|
||||||
|
return xerrors.Errorf("enough")
|
||||||
|
}
|
||||||
|
|
||||||
|
carb = &rawCarb{
|
||||||
|
max: maxSz,
|
||||||
|
blocks: map[cid.Cid]block.Block{},
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("gc")
|
||||||
|
go runtime.GC()
|
||||||
|
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("carb consume: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
path := filepath.Join(lr.Path(), "datastore", "chain")
|
||||||
|
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Logger = &badgerLog{
|
||||||
|
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
|
||||||
|
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("open db")
|
||||||
|
|
||||||
|
db, err := badger.Open(opts.Options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open badger blockstore: %w", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
log.Infow("new stream")
|
||||||
|
|
||||||
|
var wlk sync.Mutex
|
||||||
|
|
||||||
|
str := db.NewStream()
|
||||||
|
str.NumGo = 16
|
||||||
|
str.LogPrefix = "bstream"
|
||||||
|
str.Send = func(list *pb.KVList) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
log.Errorw("send error", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, kv := range list.Kv {
|
||||||
|
if kv.Key == nil || kv.Value == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(string(kv.Key), "/blocks/") {
|
||||||
|
log.Infow("no blocks prefix", "key", string(kv.Key))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):]))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := cid.NewCidV1(cid.Raw, h)
|
||||||
|
|
||||||
|
b, err := block.NewBlockWithCid(kv.Value, c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("readblk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wlk.Lock()
|
||||||
|
err = consume(c, b)
|
||||||
|
wlk.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("consume stream block: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := str.Orchestrate(ctx); err != nil {
|
||||||
|
return xerrors.Errorf("orchestrate stream: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("write last")
|
||||||
|
|
||||||
|
root, err := carb.finalize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("carb finalize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
|
||||||
|
return xerrors.Errorf("writeCar: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var fullCar = errors.New("full")
|
||||||
|
|
||||||
|
const maxlinks = 16
|
||||||
|
|
||||||
|
type rawCarb struct {
|
||||||
|
blockstore.Blockstore
|
||||||
|
|
||||||
|
max, cur uint64
|
||||||
|
|
||||||
|
nodes []*shedgen.CarbNode
|
||||||
|
|
||||||
|
blocks map[cid.Cid]block.Block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
||||||
|
_, has := rc.blocks[c]
|
||||||
|
return has, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
|
||||||
|
b, has := rc.blocks[c]
|
||||||
|
if !has {
|
||||||
|
return nil, blockstore.ErrNotFound
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) GetSize(ctx context.Context, c cid.Cid) (int, error) {
|
||||||
|
b, has := rc.blocks[c]
|
||||||
|
if !has {
|
||||||
|
return 0, blockstore.ErrNotFound
|
||||||
|
}
|
||||||
|
return len(b.RawData()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) checkNodes(maxl int) error {
|
||||||
|
if len(rc.nodes) == 0 {
|
||||||
|
log.Infow("add level", "l", 0)
|
||||||
|
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
|
||||||
|
}
|
||||||
|
for i := 0; i < len(rc.nodes); i++ {
|
||||||
|
if len(rc.nodes[i].Sub) <= maxl {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(rc.nodes) <= i+1 {
|
||||||
|
log.Infow("add level", "l", i+1)
|
||||||
|
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
|
||||||
|
}
|
||||||
|
|
||||||
|
var bb bytes.Buffer
|
||||||
|
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c, err := cid.Prefix{
|
||||||
|
Version: 1,
|
||||||
|
Codec: cid.DagCBOR,
|
||||||
|
MhType: mh.SHA2_256,
|
||||||
|
MhLength: -1,
|
||||||
|
}.Sum(bb.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("gen cid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := block.NewBlockWithCid(bb.Bytes(), c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("new block: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if i > 1 {
|
||||||
|
log.Infow("compact", "from", i, "to", i+1, "sub", c.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
|
||||||
|
rc.blocks[c] = b
|
||||||
|
rc.nodes[i] = new(shedgen.CarbNode)
|
||||||
|
rc.cur += uint64(bb.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) consume(c cid.Cid, b block.Block) error {
|
||||||
|
if err := rc.checkNodes(maxlinks); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if rc.cur+uint64(len(b.RawData())) > rc.max {
|
||||||
|
return fullCar
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.cur += uint64(len(b.RawData()))
|
||||||
|
|
||||||
|
b, err := block.NewBlockWithCid(b.RawData(), c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("create raw block: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.blocks[c] = b
|
||||||
|
rc.nodes[0].Sub = append(rc.nodes[0].Sub, c)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) finalize() (cid.Cid, error) {
|
||||||
|
if len(rc.nodes) == 0 {
|
||||||
|
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(rc.nodes); i++ {
|
||||||
|
var bb bytes.Buffer
|
||||||
|
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
c, err := cid.Prefix{
|
||||||
|
Version: 1,
|
||||||
|
Codec: cid.DagCBOR,
|
||||||
|
MhType: mh.SHA2_256,
|
||||||
|
MhLength: -1,
|
||||||
|
}.Sum(bb.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("gen cid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := block.NewBlockWithCid(bb.Bytes(), c)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("new block: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("fin", "level", i, "cid", c.String())
|
||||||
|
|
||||||
|
rc.blocks[c] = b
|
||||||
|
rc.nodes[i] = new(shedgen.CarbNode)
|
||||||
|
rc.cur += uint64(bb.Len())
|
||||||
|
|
||||||
|
if len(rc.nodes[i].Sub) <= 1 && i == len(rc.nodes)-1 {
|
||||||
|
return c, err
|
||||||
|
}
|
||||||
|
if len(rc.nodes) <= i+1 {
|
||||||
|
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
|
||||||
|
}
|
||||||
|
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
|
||||||
|
}
|
||||||
|
return cid.Undef, xerrors.Errorf("failed to finalize")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) error {
|
||||||
|
f, err := os.Create(path)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("create out car: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bs := rc
|
||||||
|
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
||||||
|
|
||||||
|
log.Infow("write car", "path", path, "root", root.String(), "blocks", len(rc.blocks))
|
||||||
|
|
||||||
|
return car.WriteCar(ctx, ds, []cid.Cid{root}, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ blockstore.Blockstore = &rawCarb{}
|
||||||
|
|
||||||
|
type badgerLog struct {
|
||||||
|
*zap.SugaredLogger
|
||||||
|
skip2 *zap.SugaredLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *badgerLog) Warningf(format string, args ...interface{}) {
|
||||||
|
b.skip2.Warnf(format, args...)
|
||||||
|
}
|
||||||
|
128
cmd/lotus-shed/shedgen/cbor_gen.go
Normal file
128
cmd/lotus-shed/shedgen/cbor_gen.go
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||||
|
|
||||||
|
package shedgen
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
cid "github.com/ipfs/go-cid"
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
xerrors "golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = xerrors.Errorf
|
||||||
|
var _ = cid.Undef
|
||||||
|
var _ = math.E
|
||||||
|
var _ = sort.Sort
|
||||||
|
|
||||||
|
func (t *CarbNode) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{161}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
scratch := make([]byte, 9)
|
||||||
|
|
||||||
|
// t.Sub ([]cid.Cid) (slice)
|
||||||
|
if len("Sub") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"Sub\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Sub"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("Sub")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.Sub) > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Slice value in field t.Sub was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Sub))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, v := range t.Sub {
|
||||||
|
if err := cbg.WriteCidBuf(scratch, w, v); err != nil {
|
||||||
|
return xerrors.Errorf("failed writing cid field t.Sub: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *CarbNode) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
*t = CarbNode{}
|
||||||
|
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
scratch := make([]byte, 8)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajMap {
|
||||||
|
return fmt.Errorf("cbor input should be of type map")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > cbg.MaxLength {
|
||||||
|
return fmt.Errorf("CarbNode: map struct too large (%d)", extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
var name string
|
||||||
|
n := extra
|
||||||
|
|
||||||
|
for i := uint64(0); i < n; i++ {
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadStringBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch name {
|
||||||
|
// t.Sub ([]cid.Cid) (slice)
|
||||||
|
case "Sub":
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > cbg.MaxLength {
|
||||||
|
return fmt.Errorf("t.Sub: array too large (%d)", extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
if maj != cbg.MajArray {
|
||||||
|
return fmt.Errorf("expected cbor array")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > 0 {
|
||||||
|
t.Sub = make([]cid.Cid, extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < int(extra); i++ {
|
||||||
|
|
||||||
|
c, err := cbg.ReadCid(br)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("reading cid field t.Sub failed: %w", err)
|
||||||
|
}
|
||||||
|
t.Sub[i] = c
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Field doesn't exist on this type, so ignore it
|
||||||
|
cbg.ScanForLinks(r, func(cid.Cid) {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
7
cmd/lotus-shed/shedgen/rawexport.go
Normal file
7
cmd/lotus-shed/shedgen/rawexport.go
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
package shedgen
|
||||||
|
|
||||||
|
import "github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
type CarbNode struct {
|
||||||
|
Sub []cid.Cid
|
||||||
|
}
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/exchange"
|
"github.com/filecoin-project/lotus/chain/exchange"
|
||||||
"github.com/filecoin-project/lotus/chain/market"
|
"github.com/filecoin-project/lotus/chain/market"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
@ -106,4 +107,11 @@ func main() {
|
|||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
err = gen.WriteMapEncodersToFile("./cmd/lotus-shed/shedgen/cbor_gen.go", "shedgen",
|
||||||
|
shedgen.CarbNode{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user