761 lines
18 KiB
Go
761 lines
18 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
format "github.com/ipfs/go-ipld-format"
|
|
"github.com/ipld/go-car"
|
|
carutil "github.com/ipld/go-car/util"
|
|
carv2 "github.com/ipld/go-car/v2"
|
|
"github.com/multiformats/go-multicodec"
|
|
cbg "github.com/whyrusleeping/cbor-gen"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
bstore "github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
const TipsetkeyBackfillRange = 2 * build.Finality
|
|
|
|
func (cs *ChainStore) UnionStore() bstore.Blockstore {
|
|
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
|
|
}
|
|
|
|
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
|
|
h := &car.CarHeader{
|
|
Roots: ts.Cids(),
|
|
Version: 1,
|
|
}
|
|
|
|
if err := car.WriteHeader(h, w); err != nil {
|
|
return xerrors.Errorf("failed to write car header: %s", err)
|
|
}
|
|
|
|
unionBs := cs.UnionStore()
|
|
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
|
|
blk, err := unionBs.Get(ctx, c)
|
|
if err != nil {
|
|
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
|
}
|
|
|
|
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
|
|
return xerrors.Errorf("failed to write block to car output: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
|
|
// TODO: writing only to the state blockstore is incorrect.
|
|
// At this time, both the state and chain blockstores are backed by the
|
|
// universal store. When we physically segregate the stores, we will need
|
|
// to route state objects to the state blockstore, and chain objects to
|
|
// the chain blockstore.
|
|
|
|
br, err := carv2.NewBlockReader(r)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
|
}
|
|
|
|
s := cs.StateBlockstore()
|
|
|
|
parallelPuts := 5
|
|
putThrottle := make(chan error, parallelPuts)
|
|
for i := 0; i < parallelPuts; i++ {
|
|
putThrottle <- nil
|
|
}
|
|
|
|
var buf []blocks.Block
|
|
for {
|
|
blk, err := br.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
if len(buf) > 0 {
|
|
if err := s.PutMany(ctx, buf); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
break
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
buf = append(buf, blk)
|
|
|
|
if len(buf) > 1000 {
|
|
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
|
|
return nil, lastErr
|
|
}
|
|
|
|
go func(buf []blocks.Block) {
|
|
putThrottle <- s.PutMany(ctx, buf)
|
|
}(buf)
|
|
buf = nil
|
|
}
|
|
}
|
|
|
|
// check errors
|
|
for i := 0; i < parallelPuts; i++ {
|
|
if lastErr := <-putThrottle; lastErr != nil {
|
|
return nil, lastErr
|
|
}
|
|
}
|
|
|
|
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
|
}
|
|
|
|
ts := root
|
|
tssToPersist := make([]*types.TipSet, 0, TipsetkeyBackfillRange)
|
|
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
|
|
tssToPersist = append(tssToPersist, ts)
|
|
parentTsKey := ts.Parents()
|
|
ts, err = cs.LoadTipSet(ctx, parentTsKey)
|
|
if ts == nil || err != nil {
|
|
log.Warnf("Only able to load the last %d tipsets", i)
|
|
break
|
|
}
|
|
}
|
|
|
|
if err := cs.PersistTipsets(ctx, tssToPersist); err != nil {
|
|
return nil, xerrors.Errorf("failed to persist tipsets: %w", err)
|
|
}
|
|
|
|
return root, nil
|
|
}
|
|
|
|
type walkSchedTaskType int
|
|
|
|
const (
|
|
finishTask walkSchedTaskType = -1
|
|
blockTask walkSchedTaskType = iota
|
|
messageTask
|
|
receiptTask
|
|
stateTask
|
|
dagTask
|
|
)
|
|
|
|
func (t walkSchedTaskType) String() string {
|
|
switch t {
|
|
case finishTask:
|
|
return "finish"
|
|
case blockTask:
|
|
return "block"
|
|
case messageTask:
|
|
return "message"
|
|
case receiptTask:
|
|
return "receipt"
|
|
case stateTask:
|
|
return "state"
|
|
case dagTask:
|
|
return "dag"
|
|
}
|
|
panic(fmt.Sprintf("unknow task %d", t))
|
|
}
|
|
|
|
type walkTask struct {
|
|
c cid.Cid
|
|
taskType walkSchedTaskType
|
|
topLevelTaskType walkSchedTaskType
|
|
blockCid cid.Cid
|
|
epoch abi.ChainEpoch
|
|
}
|
|
|
|
// an ever growing FIFO
|
|
type taskFifo struct {
|
|
in chan walkTask
|
|
out chan walkTask
|
|
fifo []walkTask
|
|
}
|
|
|
|
type taskResult struct {
|
|
c cid.Cid
|
|
b blocks.Block
|
|
}
|
|
|
|
func newTaskFifo(bufferLen int) *taskFifo {
|
|
f := taskFifo{
|
|
in: make(chan walkTask, bufferLen),
|
|
out: make(chan walkTask, bufferLen),
|
|
fifo: make([]walkTask, 0),
|
|
}
|
|
|
|
go f.run()
|
|
|
|
return &f
|
|
}
|
|
|
|
func (f *taskFifo) Close() error {
|
|
close(f.in)
|
|
return nil
|
|
}
|
|
|
|
func (f *taskFifo) run() {
|
|
for {
|
|
if len(f.fifo) > 0 {
|
|
// we have items in slice
|
|
// try to put next out or read something in.
|
|
// blocks if nothing works.
|
|
next := f.fifo[0]
|
|
select {
|
|
case f.out <- next:
|
|
f.fifo = f.fifo[1:]
|
|
case elem, ok := <-f.in:
|
|
if !ok {
|
|
// drain and close out.
|
|
for _, elem := range f.fifo {
|
|
f.out <- elem
|
|
}
|
|
close(f.out)
|
|
return
|
|
}
|
|
f.fifo = append(f.fifo, elem)
|
|
}
|
|
} else {
|
|
// no elements in fifo to put out.
|
|
// Try to read in and block.
|
|
// When done, try to put out or add to fifo.
|
|
select {
|
|
case elem, ok := <-f.in:
|
|
if !ok {
|
|
close(f.out)
|
|
return
|
|
}
|
|
select {
|
|
case f.out <- elem:
|
|
default:
|
|
f.fifo = append(f.fifo, elem)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type walkSchedulerConfig struct {
|
|
numWorkers int
|
|
|
|
head *types.TipSet // Tipset to start walking from.
|
|
tail *types.TipSet // Tipset to end at.
|
|
includeMessages bool
|
|
includeReceipts bool
|
|
includeState bool
|
|
}
|
|
|
|
type walkScheduler struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
store bstore.Blockstore
|
|
cfg walkSchedulerConfig
|
|
writer io.Writer
|
|
|
|
workerTasks *taskFifo
|
|
totalTasks atomic.Int64
|
|
results chan taskResult
|
|
writeErrorChan chan error
|
|
|
|
// tracks number of inflight tasks
|
|
//taskWg sync.WaitGroup
|
|
// launches workers and collects errors if any occur
|
|
workers *errgroup.Group
|
|
// set of CIDs already exported
|
|
seen sync.Map
|
|
}
|
|
|
|
func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSchedulerConfig, w io.Writer) (*walkScheduler, error) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
workers, ctx := errgroup.WithContext(ctx)
|
|
s := &walkScheduler{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store,
|
|
cfg: cfg,
|
|
writer: w,
|
|
results: make(chan taskResult, cfg.numWorkers*64),
|
|
workerTasks: newTaskFifo(cfg.numWorkers * 64),
|
|
writeErrorChan: make(chan error, 1),
|
|
workers: workers,
|
|
}
|
|
|
|
go func() {
|
|
defer close(s.writeErrorChan)
|
|
for r := range s.results {
|
|
// Write
|
|
if err := carutil.LdWrite(s.writer, r.c.Bytes(), r.b.RawData()); err != nil {
|
|
// abort operations
|
|
cancel()
|
|
s.writeErrorChan <- err
|
|
}
|
|
}
|
|
}()
|
|
|
|
// workers
|
|
for i := 0; i < cfg.numWorkers; i++ {
|
|
f := func(n int) func() error {
|
|
return func() error {
|
|
return s.workerFunc(n)
|
|
}
|
|
}(i)
|
|
s.workers.Go(f)
|
|
}
|
|
|
|
s.totalTasks.Add(int64(len(cfg.head.Blocks())))
|
|
for _, b := range cfg.head.Blocks() {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Errorw("context done while sending root tasks", ctx.Err())
|
|
cancel() // kill workers
|
|
return nil, ctx.Err()
|
|
case s.workerTasks.in <- walkTask{
|
|
c: b.Cid(),
|
|
taskType: blockTask,
|
|
topLevelTaskType: blockTask,
|
|
blockCid: b.Cid(),
|
|
epoch: cfg.head.Height(),
|
|
}:
|
|
}
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *walkScheduler) Wait() error {
|
|
err := s.workers.Wait()
|
|
// all workers done. One would have reached genesis and notified the
|
|
// rest to exit. Yet, there might be some pending tasks in the queue,
|
|
// so we need to run a "single worker".
|
|
if err != nil {
|
|
log.Errorw("export workers finished with error", "error", err)
|
|
}
|
|
|
|
for {
|
|
if n := s.totalTasks.Load(); n == 0 {
|
|
break // finally fully done
|
|
}
|
|
select {
|
|
case task := <-s.workerTasks.out:
|
|
s.totalTasks.Add(-1)
|
|
if err != nil {
|
|
continue // just drain if errors happened.
|
|
}
|
|
err = s.processTask(task, 0)
|
|
}
|
|
}
|
|
close(s.results)
|
|
errWrite := <-s.writeErrorChan
|
|
if errWrite != nil {
|
|
log.Errorw("error writing to CAR file", "error", err)
|
|
return errWrite
|
|
}
|
|
s.workerTasks.Close() //nolint:errcheck
|
|
return err
|
|
}
|
|
|
|
func (s *walkScheduler) enqueueIfNew(task walkTask) {
|
|
if multicodec.Code(task.c.Prefix().MhType) == multicodec.Identity {
|
|
//log.Infow("ignored", "cid", todo.c.String())
|
|
return
|
|
}
|
|
|
|
// This lets through RAW, CBOR, and DagCBOR blocks, the only types that we end up writing to
|
|
// the exported CAR.
|
|
switch multicodec.Code(task.c.Prefix().Codec) {
|
|
case multicodec.Cbor, multicodec.DagCbor, multicodec.Raw:
|
|
default:
|
|
//log.Infow("ignored", "cid", todo.c.String())
|
|
return
|
|
}
|
|
if _, loaded := s.seen.LoadOrStore(task.c, struct{}{}); loaded {
|
|
// we already had it on the map
|
|
return
|
|
}
|
|
|
|
log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String())
|
|
s.totalTasks.Add(1)
|
|
s.workerTasks.in <- task
|
|
}
|
|
|
|
func (s *walkScheduler) sendFinish(workerN int) error {
|
|
log.Infow("worker finished work", "worker", workerN)
|
|
s.totalTasks.Add(1)
|
|
s.workerTasks.in <- walkTask{
|
|
taskType: finishTask,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *walkScheduler) workerFunc(workerN int) error {
|
|
log.Infow("starting worker", "worker", workerN)
|
|
for t := range s.workerTasks.out {
|
|
s.totalTasks.Add(-1)
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
// A worker reached genesis, so we wind down and let others do
|
|
// the same. Exit.
|
|
if t.taskType == finishTask {
|
|
return s.sendFinish(workerN)
|
|
}
|
|
}
|
|
|
|
err := s.processTask(t, workerN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// continue
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|
if t.taskType == finishTask {
|
|
return nil
|
|
}
|
|
|
|
blk, err := s.store.Get(s.ctx, t.c)
|
|
if errors.Is(err, format.ErrNotFound{}) && t.topLevelTaskType == receiptTask {
|
|
log.Debugw("ignoring not-found block in Receipts",
|
|
"block", t.blockCid,
|
|
"epoch", t.epoch,
|
|
"cid", t.c)
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return xerrors.Errorf(
|
|
"blockstore.Get(%s). Task: %s. Block: %s (%s). Epoch: %d. Err: %w",
|
|
t.c, t.taskType, t.topLevelTaskType, t.blockCid, t.epoch, err)
|
|
}
|
|
|
|
s.results <- taskResult{
|
|
c: t.c,
|
|
b: blk,
|
|
}
|
|
|
|
// We exported the ipld block. If it wasn't a CBOR block, there's nothing
|
|
// else to do and we can bail out early as it won't have any links
|
|
// etc.
|
|
if multicodec.Code(t.c.Prefix().Codec) != multicodec.DagCbor ||
|
|
multicodec.Code(t.c.Prefix().MhType) == multicodec.Identity {
|
|
return nil
|
|
}
|
|
|
|
rawData := blk.RawData()
|
|
|
|
// extract relevant dags to walk from the block
|
|
if t.taskType == blockTask {
|
|
var b types.BlockHeader
|
|
if err := b.UnmarshalCBOR(bytes.NewBuffer(rawData)); err != nil {
|
|
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
|
|
}
|
|
if b.Height%1_000 == 0 {
|
|
log.Infow("block export", "height", b.Height)
|
|
}
|
|
if b.Height == 0 {
|
|
log.Info("exporting genesis block")
|
|
for i := range b.Parents {
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.Parents[i],
|
|
taskType: dagTask,
|
|
topLevelTaskType: blockTask,
|
|
blockCid: b.Parents[i],
|
|
epoch: 0,
|
|
})
|
|
}
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.ParentStateRoot,
|
|
taskType: stateTask,
|
|
topLevelTaskType: stateTask,
|
|
blockCid: t.c,
|
|
epoch: 0,
|
|
})
|
|
|
|
return s.sendFinish(workerN)
|
|
}
|
|
// enqueue block parents
|
|
for i := range b.Parents {
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.Parents[i],
|
|
taskType: blockTask,
|
|
topLevelTaskType: blockTask,
|
|
blockCid: b.Parents[i],
|
|
epoch: b.Height,
|
|
})
|
|
}
|
|
if s.cfg.tail.Height() >= b.Height {
|
|
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", t.c.String())
|
|
return nil
|
|
}
|
|
|
|
if s.cfg.includeMessages {
|
|
// enqueue block messages
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.Messages,
|
|
taskType: messageTask,
|
|
topLevelTaskType: messageTask,
|
|
blockCid: t.c,
|
|
epoch: b.Height,
|
|
})
|
|
}
|
|
if s.cfg.includeReceipts {
|
|
// enqueue block receipts
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.ParentMessageReceipts,
|
|
taskType: receiptTask,
|
|
topLevelTaskType: receiptTask,
|
|
blockCid: t.c,
|
|
epoch: b.Height,
|
|
})
|
|
}
|
|
if s.cfg.includeState {
|
|
s.enqueueIfNew(walkTask{
|
|
c: b.ParentStateRoot,
|
|
taskType: stateTask,
|
|
topLevelTaskType: stateTask,
|
|
blockCid: t.c,
|
|
epoch: b.Height,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Not a chain-block: we scan for CIDs in the raw block-data
|
|
err = cbg.ScanForLinks(bytes.NewReader(rawData), func(c cid.Cid) {
|
|
s.enqueueIfNew(walkTask{
|
|
c: c,
|
|
taskType: dagTask,
|
|
topLevelTaskType: t.topLevelTaskType,
|
|
blockCid: t.blockCid,
|
|
epoch: t.epoch,
|
|
})
|
|
})
|
|
|
|
if err != nil {
|
|
return xerrors.Errorf(
|
|
"ScanForLinks(%s). Task: %s. Block: %s (%s). Epoch: %d. Err: %w",
|
|
t.c, t.taskType, t.topLevelTaskType, t.blockCid, t.epoch, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cs *ChainStore) ExportRange(
|
|
ctx context.Context,
|
|
w io.Writer,
|
|
head, tail *types.TipSet,
|
|
messages, receipts, stateroots bool,
|
|
workers int) error {
|
|
|
|
h := &car.CarHeader{
|
|
Roots: head.Cids(),
|
|
Version: 1,
|
|
}
|
|
|
|
if err := car.WriteHeader(h, w); err != nil {
|
|
return xerrors.Errorf("failed to write car header: %s", err)
|
|
}
|
|
|
|
start := time.Now()
|
|
log.Infow("walking snapshot range",
|
|
"head", head.Key(),
|
|
"tail", tail.Key(),
|
|
"messages", messages,
|
|
"receipts", receipts,
|
|
"stateroots",
|
|
stateroots,
|
|
"workers", workers)
|
|
|
|
cfg := walkSchedulerConfig{
|
|
numWorkers: workers,
|
|
head: head,
|
|
tail: tail,
|
|
includeMessages: messages,
|
|
includeState: stateroots,
|
|
includeReceipts: receipts,
|
|
}
|
|
|
|
pw, err := newWalkScheduler(ctx, cs.UnionStore(), cfg, w)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// wait until all workers are done.
|
|
err = pw.Wait()
|
|
if err != nil {
|
|
log.Errorw("walker scheduler", "error", err)
|
|
return err
|
|
}
|
|
|
|
log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil)
|
|
return nil
|
|
}
|
|
|
|
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
|
|
if ts == nil {
|
|
ts = cs.GetHeaviestTipSet()
|
|
}
|
|
|
|
seen := cid.NewSet()
|
|
walked := cid.NewSet()
|
|
|
|
blocksToWalk := ts.Cids()
|
|
currentMinHeight := ts.Height()
|
|
|
|
walkChain := func(blk cid.Cid) error {
|
|
if !seen.Visit(blk) {
|
|
return nil
|
|
}
|
|
|
|
if err := cb(blk); err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := cs.chainBlockstore.Get(ctx, blk)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting block: %w", err)
|
|
}
|
|
|
|
var b types.BlockHeader
|
|
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
|
|
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
|
|
}
|
|
|
|
if currentMinHeight > b.Height {
|
|
currentMinHeight = b.Height
|
|
if currentMinHeight%builtin.EpochsInDay == 0 {
|
|
log.Infow("export", "height", currentMinHeight)
|
|
}
|
|
}
|
|
|
|
var cids []cid.Cid
|
|
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
|
|
if walked.Visit(b.Messages) {
|
|
mcids, err := recurseLinks(ctx, cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
|
|
if err != nil {
|
|
return xerrors.Errorf("recursing messages failed: %w", err)
|
|
}
|
|
cids = mcids
|
|
}
|
|
}
|
|
|
|
if b.Height > 0 {
|
|
for _, p := range b.Parents {
|
|
blocksToWalk = append(blocksToWalk, p)
|
|
}
|
|
} else {
|
|
// include the genesis block
|
|
cids = append(cids, b.Parents...)
|
|
}
|
|
|
|
out := cids
|
|
|
|
if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
|
|
if walked.Visit(b.ParentStateRoot) {
|
|
cids, err := recurseLinks(ctx, cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
|
|
if err != nil {
|
|
return xerrors.Errorf("recursing genesis state failed: %w", err)
|
|
}
|
|
|
|
out = append(out, cids...)
|
|
}
|
|
|
|
if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) {
|
|
out = append(out, b.ParentMessageReceipts)
|
|
}
|
|
}
|
|
|
|
for _, c := range out {
|
|
if seen.Visit(c) {
|
|
prefix := c.Prefix()
|
|
|
|
// Don't include identity CIDs.
|
|
if multicodec.Code(prefix.MhType) == multicodec.Identity {
|
|
continue
|
|
}
|
|
|
|
// We only include raw, cbor, and dagcbor, for now.
|
|
switch multicodec.Code(prefix.Codec) {
|
|
case multicodec.Cbor, multicodec.DagCbor, multicodec.Raw:
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if err := cb(c); err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
log.Infow("export started")
|
|
exportStart := build.Clock.Now()
|
|
|
|
for len(blocksToWalk) > 0 {
|
|
next := blocksToWalk[0]
|
|
blocksToWalk = blocksToWalk[1:]
|
|
if err := walkChain(next); err != nil {
|
|
return xerrors.Errorf("walk chain failed: %w", err)
|
|
}
|
|
}
|
|
|
|
log.Infow("export finished", "duration", build.Clock.Now().Sub(exportStart).Seconds())
|
|
|
|
return nil
|
|
}
|
|
|
|
func recurseLinks(ctx context.Context, bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
|
|
if multicodec.Code(root.Prefix().Codec) != multicodec.DagCbor {
|
|
return in, nil
|
|
}
|
|
|
|
data, err := bs.Get(ctx, root)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err)
|
|
}
|
|
|
|
var rerr error
|
|
err = cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) {
|
|
if rerr != nil {
|
|
// No error return on ScanForLinks :(
|
|
return
|
|
}
|
|
|
|
// traversed this already...
|
|
if !walked.Visit(c) {
|
|
return
|
|
}
|
|
|
|
in = append(in, c)
|
|
var err error
|
|
in, err = recurseLinks(ctx, bs, walked, c, in)
|
|
if err != nil {
|
|
rerr = err
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("scanning for links failed: %w", err)
|
|
}
|
|
|
|
return in, rerr
|
|
}
|