287 lines
6.7 KiB
Go
287 lines
6.7 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipld/go-car"
|
|
carutil "github.com/ipld/go-car/util"
|
|
carv2 "github.com/ipld/go-car/v2"
|
|
mh "github.com/multiformats/go-multihash"
|
|
cbg "github.com/whyrusleeping/cbor-gen"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
bstore "github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
const TipsetkeyBackfillRange = 2 * build.Finality
|
|
|
|
func (cs *ChainStore) UnionStore() bstore.Blockstore {
|
|
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
|
|
}
|
|
|
|
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
|
|
h := &car.CarHeader{
|
|
Roots: ts.Cids(),
|
|
Version: 1,
|
|
}
|
|
|
|
if err := car.WriteHeader(h, w); err != nil {
|
|
return xerrors.Errorf("failed to write car header: %s", err)
|
|
}
|
|
|
|
unionBs := cs.UnionStore()
|
|
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
|
|
blk, err := unionBs.Get(ctx, c)
|
|
if err != nil {
|
|
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
|
}
|
|
|
|
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
|
|
return xerrors.Errorf("failed to write block to car output: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
|
|
// TODO: writing only to the state blockstore is incorrect.
|
|
// At this time, both the state and chain blockstores are backed by the
|
|
// universal store. When we physically segregate the stores, we will need
|
|
// to route state objects to the state blockstore, and chain objects to
|
|
// the chain blockstore.
|
|
|
|
br, err := carv2.NewBlockReader(r)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
|
}
|
|
|
|
s := cs.StateBlockstore()
|
|
|
|
parallelPuts := 5
|
|
putThrottle := make(chan error, parallelPuts)
|
|
for i := 0; i < parallelPuts; i++ {
|
|
putThrottle <- nil
|
|
}
|
|
|
|
var buf []blocks.Block
|
|
for {
|
|
blk, err := br.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
if len(buf) > 0 {
|
|
if err := s.PutMany(ctx, buf); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
break
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
buf = append(buf, blk)
|
|
|
|
if len(buf) > 1000 {
|
|
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
|
|
return nil, lastErr
|
|
}
|
|
|
|
go func(buf []blocks.Block) {
|
|
putThrottle <- s.PutMany(ctx, buf)
|
|
}(buf)
|
|
buf = nil
|
|
}
|
|
}
|
|
|
|
// check errors
|
|
for i := 0; i < parallelPuts; i++ {
|
|
if lastErr := <-putThrottle; lastErr != nil {
|
|
return nil, lastErr
|
|
}
|
|
}
|
|
|
|
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
|
}
|
|
|
|
ts := root
|
|
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
|
|
err = cs.PersistTipset(ctx, ts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
parentTsKey := ts.Parents()
|
|
ts, err = cs.LoadTipSet(ctx, parentTsKey)
|
|
if ts == nil || err != nil {
|
|
log.Warnf("Only able to load the last %d tipsets", i)
|
|
break
|
|
}
|
|
}
|
|
|
|
return root, nil
|
|
}
|
|
|
|
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
|
|
if ts == nil {
|
|
ts = cs.GetHeaviestTipSet()
|
|
}
|
|
|
|
seen := cid.NewSet()
|
|
walked := cid.NewSet()
|
|
|
|
blocksToWalk := ts.Cids()
|
|
currentMinHeight := ts.Height()
|
|
|
|
walkChain := func(blk cid.Cid) error {
|
|
if !seen.Visit(blk) {
|
|
return nil
|
|
}
|
|
|
|
if err := cb(blk); err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := cs.chainBlockstore.Get(ctx, blk)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting block: %w", err)
|
|
}
|
|
|
|
var b types.BlockHeader
|
|
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
|
|
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
|
|
}
|
|
|
|
if currentMinHeight > b.Height {
|
|
currentMinHeight = b.Height
|
|
if currentMinHeight%builtin.EpochsInDay == 0 {
|
|
log.Infow("export", "height", currentMinHeight)
|
|
}
|
|
}
|
|
|
|
var cids []cid.Cid
|
|
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
|
|
if walked.Visit(b.Messages) {
|
|
mcids, err := recurseLinks(ctx, cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
|
|
if err != nil {
|
|
return xerrors.Errorf("recursing messages failed: %w", err)
|
|
}
|
|
cids = mcids
|
|
}
|
|
}
|
|
|
|
if b.Height > 0 {
|
|
for _, p := range b.Parents {
|
|
blocksToWalk = append(blocksToWalk, p)
|
|
}
|
|
} else {
|
|
// include the genesis block
|
|
cids = append(cids, b.Parents...)
|
|
}
|
|
|
|
out := cids
|
|
|
|
if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
|
|
if walked.Visit(b.ParentStateRoot) {
|
|
cids, err := recurseLinks(ctx, cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
|
|
if err != nil {
|
|
return xerrors.Errorf("recursing genesis state failed: %w", err)
|
|
}
|
|
|
|
out = append(out, cids...)
|
|
}
|
|
|
|
if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) {
|
|
out = append(out, b.ParentMessageReceipts)
|
|
}
|
|
}
|
|
|
|
for _, c := range out {
|
|
if seen.Visit(c) {
|
|
prefix := c.Prefix()
|
|
|
|
// Don't include identity CIDs.
|
|
if prefix.MhType == mh.IDENTITY {
|
|
continue
|
|
}
|
|
|
|
// We only include raw and dagcbor, for now.
|
|
// Raw for "code" CIDs.
|
|
switch prefix.Codec {
|
|
case cid.Raw, cid.DagCBOR:
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if err := cb(c); err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
log.Infow("export started")
|
|
exportStart := build.Clock.Now()
|
|
|
|
for len(blocksToWalk) > 0 {
|
|
next := blocksToWalk[0]
|
|
blocksToWalk = blocksToWalk[1:]
|
|
if err := walkChain(next); err != nil {
|
|
return xerrors.Errorf("walk chain failed: %w", err)
|
|
}
|
|
}
|
|
|
|
log.Infow("export finished", "duration", build.Clock.Now().Sub(exportStart).Seconds())
|
|
|
|
return nil
|
|
}
|
|
|
|
func recurseLinks(ctx context.Context, bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
|
|
if root.Prefix().Codec != cid.DagCBOR {
|
|
return in, nil
|
|
}
|
|
|
|
data, err := bs.Get(ctx, root)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err)
|
|
}
|
|
|
|
var rerr error
|
|
err = cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) {
|
|
if rerr != nil {
|
|
// No error return on ScanForLinks :(
|
|
return
|
|
}
|
|
|
|
// traversed this already...
|
|
if !walked.Visit(c) {
|
|
return
|
|
}
|
|
|
|
in = append(in, c)
|
|
var err error
|
|
in, err = recurseLinks(ctx, bs, walked, c, in)
|
|
if err != nil {
|
|
rerr = err
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("scanning for links failed: %w", err)
|
|
}
|
|
|
|
return in, rerr
|
|
}
|