Chain ranged export: rework and address current shortcomings

This commit moderately refactors the ranged export code. It addresses several
problems:
  * Code does not finish cleanly and things hang on ctrl-c
  * Same block is read multiple times in a row (artificially increasing cached
    blockstore metrics to 50%)
  * It is unclear whether there are additional races (a single worker quits
    when reaching height 0)
  * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or
    so). Some blocks appear up to 5 times.
  * Using pointers for tasks where it is not necessary.

The changes:

  * Use a FIFO instead of stack: simpler implementation as its own type. This
has not proven to be much more memory-friendly, but it has not made things
worse either.
  * We avoid a probably not small amount of allocations by not using
    unnecessary pointers.
  * Fix duplicated blocks by atomically checking+adding to CID set.
  * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels
  are closed, avoiding any memory leaks and deadlocks.
  * We ensure all work is finished before finishing, something that might have
  been broken in some edge cases previously. In practice, we would not have
  seen this except perhaps in very early snapshots close to genesis.

Initial testing shows the code is currently about 5% faster. Resulting
snapshots do not have duplicates so they are a bit smaller. We have manually
verified that no CID is lost versus previous results, with both old and recent
snapshots.
This commit is contained in:
Hector Sanjuan 2023-02-02 17:51:52 +01:00
parent 21efd481d8
commit fa93c23813
8 changed files with 366 additions and 262 deletions

View File

@ -173,9 +173,9 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) (<-chan []byte, error) //perm:read
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) (<-chan []byte, error) //perm:read
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) error //perm:read
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:read
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// are using the splitstore

View File

@ -138,9 +138,9 @@ type FullNodeMethods struct {
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) `perm:"read"`
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) `perm:"read"`
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error `perm:"read"`
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error `perm:"read"`
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
@ -1437,25 +1437,25 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) {
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) {
if s.Internal.ChainExportRange == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainExportRange(p0, p1, p2, p3)
}
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) {
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error {
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error {
if s.Internal.ChainExportRangeInternal == nil {
return ErrNotSupported
}
return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3)
}
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error {
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error {
return ErrNotSupported
}

View File

@ -399,9 +399,10 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error {
return nil
}
// ChainExportConfig holds configuration for chain ranged exports.
type ChainExportConfig struct {
WriteBufferSize int
Workers int64
NumWorkers int
CacheSize int
IncludeMessages bool
IncludeReceipts bool

View File

@ -161,9 +161,9 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) //perm:read
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) //perm:read
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error //perm:read
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error //perm:read
// MethodGroup: Beacon
// The Beacon method group contains methods for interacting with the random beacon (DRAND)

View File

@ -47,9 +47,9 @@ type FullNodeMethods struct {
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) `perm:"read"`
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) `perm:"read"`
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error `perm:"read"`
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error `perm:"read"`
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
@ -534,25 +534,25 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) {
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) {
if s.Internal.ChainExportRange == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainExportRange(p0, p1, p2, p3)
}
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) {
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error {
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error {
if s.Internal.ChainExportRangeInternal == nil {
return ErrNotSupported
}
return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3)
}
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error {
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error {
return ErrNotSupported
}

View File

@ -15,6 +15,7 @@ import (
carv2 "github.com/ipld/go-car/v2"
mh "github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
@ -32,35 +33,6 @@ func (cs *ChainStore) UnionStore() bstore.Blockstore {
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
}
func (cs *ChainStore) ExportRange(ctx context.Context, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cacheSize int, w io.Writer) error {
h := &car.CarHeader{
Roots: head.Cids(),
Version: 1,
}
if err := car.WriteHeader(h, w); err != nil {
return xerrors.Errorf("failed to write car header: %s", err)
}
cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize)
if err != nil {
return err
}
return cs.WalkSnapshotRange(ctx, cacheStore, head, tail, messages, receipts, stateroots, workers, func(c cid.Cid) error {
blk, err := cacheStore.Get(ctx, c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}
return nil
})
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
h := &car.CarHeader{
Roots: ts.Cids(),
@ -165,67 +137,228 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
return root, nil
}
type walkTask struct {
c cid.Cid
taskType taskType
type walkSchedTaskType int
const (
finishTask walkSchedTaskType = -1
blockTask walkSchedTaskType = iota
messageTask
receiptTask
stateTask
dagTask
)
func (t walkSchedTaskType) String() string {
switch t {
case finishTask:
return "finish"
case blockTask:
return "block"
case messageTask:
return "message"
case receiptTask:
return "receipt"
case stateTask:
return "state"
case dagTask:
return "dag"
}
panic(fmt.Sprintf("unknow task %d", t))
}
type walkResult struct {
type walkTask struct {
c cid.Cid
taskType walkSchedTaskType
}
// an ever growing FIFO
type taskFifo struct {
in chan walkTask
out chan walkTask
fifo []walkTask
}
type taskResult struct {
c cid.Cid
b blocks.Block
}
func newTaskFifo(bufferLen int) *taskFifo {
f := taskFifo{
in: make(chan walkTask, bufferLen),
out: make(chan walkTask, bufferLen),
fifo: make([]walkTask, 0),
}
go f.run()
return &f
}
func (f *taskFifo) Close() error {
close(f.in)
return nil
}
func (f *taskFifo) run() {
for {
if len(f.fifo) > 0 {
// we have items in slice
// try to put next out or read something in.
// blocks if nothing works.
next := f.fifo[0]
select {
case f.out <- next:
f.fifo = f.fifo[1:]
case elem, ok := <-f.in:
if !ok {
// drain and close out.
for _, elem := range f.fifo {
f.out <- elem
}
close(f.out)
return
}
f.fifo = append(f.fifo, elem)
}
} else {
// no elements in fifo to put out.
// Try to read in and block.
// When done, try to put out or add to fifo.
select {
case elem, ok := <-f.in:
if !ok {
close(f.out)
return
}
select {
case f.out <- elem:
default:
f.fifo = append(f.fifo, elem)
}
}
}
}
}
type walkSchedulerConfig struct {
numWorkers int64
tail *types.TipSet
numWorkers int
head *types.TipSet // Tipset to start walking from.
tail *types.TipSet // Tipset to end at.
includeMessages bool
includeReceipts bool
includeState bool
}
func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg *walkSchedulerConfig, rootTasks ...*walkTask) (*walkScheduler, context.Context) {
tailSet := cid.NewSet()
for i := range cfg.tail.Cids() {
tailSet.Add(cfg.tail.Cids()[i])
}
grp, ctx := errgroup.WithContext(ctx)
s := &walkScheduler{
store: store,
numWorkers: cfg.numWorkers,
stack: rootTasks,
in: make(chan *walkTask, cfg.numWorkers*64),
out: make(chan *walkTask, cfg.numWorkers*64),
grp: grp,
tail: tailSet,
cfg: cfg,
}
s.taskWg.Add(len(rootTasks))
return s, ctx
}
type walkScheduler struct {
store bstore.Blockstore
// number of worker routine to spawn
numWorkers int64
// buffer holds tasks until they are processed
stack []*walkTask
// inbound and outbound tasks
in, out chan *walkTask
// tracks number of inflight tasks
taskWg sync.WaitGroup
// launches workers and collects errors if any occur
grp *errgroup.Group
// set of tasks seen
seen sync.Map
ctx context.Context
cancel context.CancelFunc
tail *cid.Set
cfg *walkSchedulerConfig
store bstore.Blockstore
cfg walkSchedulerConfig
writer io.Writer
workerTasks *taskFifo
totalTasks atomic.Int64
results chan taskResult
writeErrorChan chan error
// tracks number of inflight tasks
//taskWg sync.WaitGroup
// launches workers and collects errors if any occur
workers *errgroup.Group
// set of CIDs already exported
seen sync.Map
}
func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSchedulerConfig, w io.Writer) (*walkScheduler, error) {
ctx, cancel := context.WithCancel(ctx)
workers, ctx := errgroup.WithContext(ctx)
s := &walkScheduler{
ctx: ctx,
cancel: cancel,
store: store,
cfg: cfg,
writer: w,
results: make(chan taskResult, cfg.numWorkers*64),
workerTasks: newTaskFifo(cfg.numWorkers * 64),
writeErrorChan: make(chan error, 1),
workers: workers,
}
go func() {
defer close(s.writeErrorChan)
for r := range s.results {
// Write
if err := carutil.LdWrite(s.writer, r.c.Bytes(), r.b.RawData()); err != nil {
// abort operations
cancel()
s.writeErrorChan <- err
}
}
}()
// workers
for i := 0; i < cfg.numWorkers; i++ {
f := func(n int) func() error {
return func() error {
return s.workerFunc(n)
}
}(i)
s.workers.Go(f)
}
s.totalTasks.Add(int64(len(cfg.head.Blocks())))
for _, b := range cfg.head.Blocks() {
select {
case <-ctx.Done():
log.Errorw("context done while sending root tasks", ctx.Err())
cancel() // kill workers
return nil, ctx.Err()
case s.workerTasks.in <- walkTask{
c: b.Cid(),
taskType: blockTask,
}:
}
}
return s, nil
}
func (s *walkScheduler) Wait() error {
return s.grp.Wait()
err := s.workers.Wait()
// all workers done. One would have reached genesis and notified the
// rest to exit. Yet, there might be some pending tasks in the queue,
// so we need to run a "single worker".
if err != nil {
log.Errorw("export workers finished with error", "error", err)
}
func (s *walkScheduler) enqueueIfNew(task *walkTask) {
for {
if n := s.totalTasks.Load(); n == 0 {
break // finally fully done
}
select {
case task := <-s.workerTasks.out:
s.totalTasks.Add(-1)
if err != nil {
continue // just drain if errors happened.
}
err = s.processTask(task, 0)
}
}
close(s.results)
errWrite := <-s.writeErrorChan
if errWrite != nil {
log.Errorw("error writing to CAR file", "error", err)
return errWrite
}
s.workerTasks.Close() //nolint:errcheck
return err
}
func (s *walkScheduler) enqueueIfNew(task walkTask) {
if task.c.Prefix().MhType == mh.IDENTITY {
//log.Infow("ignored", "cid", todo.c.String())
return
@ -234,118 +367,68 @@ func (s *walkScheduler) enqueueIfNew(task *walkTask) {
//log.Infow("ignored", "cid", todo.c.String())
return
}
if _, ok := s.seen.Load(task.c); ok {
if _, loaded := s.seen.LoadOrStore(task.c, struct{}{}); loaded {
// we already had it on the map
return
}
log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String())
s.taskWg.Add(1)
s.seen.Store(task.c, struct{}{})
s.in <- task
s.totalTasks.Add(1)
s.workerTasks.in <- task
}
func (s *walkScheduler) startScheduler(ctx context.Context) {
s.grp.Go(func() error {
defer func() {
log.Infow("walkScheduler shutting down")
close(s.out)
// Because the workers may have exited early (due to the context being canceled).
for range s.out {
s.taskWg.Done()
func (s *walkScheduler) sendFinish(workerN int) error {
log.Infow("worker finished work", "worker", workerN)
s.totalTasks.Add(1)
s.workerTasks.in <- walkTask{
taskType: finishTask,
}
log.Info("closed scheduler out wait group")
// Because the workers may have enqueued additional tasks.
for range s.in {
s.taskWg.Done()
}
log.Info("closed scheduler in wait group")
// now, the waitgroup should be at 0, and the goroutine that was _waiting_ on it should have exited.
log.Infow("walkScheduler stopped")
}()
go func() {
s.taskWg.Wait()
close(s.in)
log.Info("closed scheduler in channel")
}()
for {
if n := len(s.stack) - 1; n >= 0 {
select {
case <-ctx.Done():
return ctx.Err()
case newJob, ok := <-s.in:
if !ok {
return nil
}
s.stack = append(s.stack, newJob)
case s.out <- s.stack[n]:
s.stack[n] = nil
s.stack = s.stack[:n]
}
} else {
func (s *walkScheduler) workerFunc(workerN int) error {
log.Infow("starting worker", "worker", workerN)
for t := range s.workerTasks.out {
s.totalTasks.Add(-1)
select {
case <-ctx.Done():
return ctx.Err()
case newJob, ok := <-s.in:
if !ok {
return nil
case <-s.ctx.Done():
return s.ctx.Err()
default:
// A worker reached genesis, so we wind down and let others do
// the same. Exit.
if t.taskType == finishTask {
return s.sendFinish(workerN)
}
s.stack = append(s.stack, newJob)
}
}
}
})
}
func (s *walkScheduler) startWorkers(ctx context.Context, out chan *walkResult) {
for i := int64(0); i < s.numWorkers; i++ {
s.grp.Go(func() error {
for task := range s.out {
if err := s.work(ctx, task, out); err != nil {
err := s.processTask(t, workerN)
if err != nil {
return err
}
// continue
}
return nil
})
}
}
type taskType int
func (t taskType) String() string {
switch t {
case Block:
return "block"
case Message:
return "message"
case Receipt:
return "receipt"
case State:
return "state"
case Dag:
return "dag"
}
panic(fmt.Sprintf("unknow task %d", t))
func (s *walkScheduler) processTask(t walkTask, workerN int) error {
if t.taskType == finishTask {
return nil
}
const (
Block taskType = iota
Message
Receipt
State
Dag
)
blk, err := s.store.Get(s.ctx, t.c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *walkResult) error {
defer s.taskWg.Done()
// unseen cid, its a result
results <- &walkResult{c: todo.c}
s.results <- taskResult{
c: t.c,
b: blk,
}
// extract relevant dags to walk from the block
if todo.taskType == Block {
blk := todo.c
data, err := s.store.Get(ctx, blk)
if t.taskType == blockTask {
blk := t.c
data, err := s.store.Get(s.ctx, blk)
if err != nil {
return err
}
@ -359,130 +442,124 @@ func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *
if b.Height == 0 {
log.Info("exporting genesis block")
for i := range b.Parents {
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: Dag,
taskType: dagTask,
})
}
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: State,
taskType: stateTask,
})
return nil
return s.sendFinish(workerN)
}
// enqueue block parents
for i := range b.Parents {
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: Block,
taskType: blockTask,
})
}
if s.cfg.tail.Height() >= b.Height {
log.Debugw("tail reached", "cid", blk.String())
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String())
return nil
}
if s.cfg.includeMessages {
// enqueue block messages
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.Messages,
taskType: Message,
taskType: messageTask,
})
}
if s.cfg.includeReceipts {
// enqueue block receipts
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.ParentMessageReceipts,
taskType: Receipt,
taskType: receiptTask,
})
}
if s.cfg.includeState {
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: State,
taskType: stateTask,
})
}
return nil
}
data, err := s.store.Get(ctx, todo.c)
if err != nil {
return err
}
return cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) {
if todo.c.Prefix().Codec != cid.DagCBOR || todo.c.Prefix().MhType == mh.IDENTITY {
// Not a chain-block: we scan for CIDs in the raw block-data
return cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
if t.c.Prefix().Codec != cid.DagCBOR || t.c.Prefix().MhType == mh.IDENTITY {
return
}
s.enqueueIfNew(&walkTask{
s.enqueueIfNew(walkTask{
c: c,
taskType: Dag,
taskType: dagTask,
})
})
}
func (cs *ChainStore) WalkSnapshotRange(ctx context.Context, store bstore.Blockstore, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cb func(cid.Cid) error) error {
func (cs *ChainStore) ExportRange(
ctx context.Context,
w io.Writer,
head, tail *types.TipSet,
messages, receipts, stateroots bool,
workers int,
cacheSize int) error {
h := &car.CarHeader{
Roots: head.Cids(),
Version: 1,
}
if err := car.WriteHeader(h, w); err != nil {
return xerrors.Errorf("failed to write car header: %s", err)
}
cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize)
if err != nil {
return err
}
start := time.Now()
log.Infow("walking snapshot range", "head", head.Key(), "tail", tail.Key(), "messages", messages, "receipts", receipts, "stateroots", stateroots, "workers", workers, "start", start)
var tasks []*walkTask
for i := range head.Blocks() {
tasks = append(tasks, &walkTask{
c: head.Blocks()[i].Cid(),
taskType: 0,
})
}
log.Infow("walking snapshot range",
"head", head.Key(),
"tail", tail.Key(),
"messages", messages,
"receipts", receipts,
"stateroots",
stateroots,
"workers", workers)
cfg := &walkSchedulerConfig{
cfg := walkSchedulerConfig{
numWorkers: workers,
head: head,
tail: tail,
includeMessages: messages,
includeState: stateroots,
includeReceipts: receipts,
}
pw, ctx := newWalkScheduler(ctx, store, cfg, tasks...)
// create a buffered channel for exported CID's scaled on the number of workers.
results := make(chan *walkResult, workers*64)
pw.startScheduler(ctx)
// workers accept channel and write results to it.
pw.startWorkers(ctx, results)
// used to wait until result channel has been drained.
resultsDone := make(chan struct{})
var cbErr error
go func() {
// signal we are done draining results when this method exits.
defer close(resultsDone)
// drain the results channel until is closes.
for res := range results {
if err := cb(res.c); err != nil {
log.Errorw("export range callback error", "error", err)
cbErr = err
return
pw, err := newWalkScheduler(ctx, cacheStore, cfg, w)
if err != nil {
return err
}
}
}()
// wait until all workers are done.
err := pw.Wait()
err = pw.Wait()
if err != nil {
log.Errorw("walker scheduler", "error", err)
}
// workers are done, close the results channel.
close(results)
// wait until all results have been consumed before exiting (its buffered).
<-resultsDone
// if there was a callback error return it.
if cbErr != nil {
return cbErr
}
log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil)
// return any error encountered by the walker.
return err
}
log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil)
return nil
}
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()

View File

@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@ -1153,12 +1154,12 @@ var ChainExportRangeCmd = &cli.Command{
Flags: []cli.Flag{
&cli.StringFlag{
Name: "head",
Usage: "specify tipset to start the export from",
Usage: "specify tipset to start the export from (higher epoch)",
Value: "@head",
},
&cli.StringFlag{
Name: "tail",
Usage: "specify tipset to end the export at",
Usage: "specify tipset to end the export at (lower epoch)",
Value: "@tail",
},
&cli.BoolFlag{
@ -1176,7 +1177,7 @@ var ChainExportRangeCmd = &cli.Command{
Usage: "specify if stateroots should be include",
Value: false,
},
&cli.Int64Flag{
&cli.IntFlag{
Name: "workers",
Usage: "specify the number of workers",
Value: 1,
@ -1234,10 +1235,14 @@ var ChainExportRangeCmd = &cli.Command{
}
}
if head.Height() < tail.Height() {
return errors.New("Height of --head tipset must be greater or equal to the height of the --tail tipset")
}
if cctx.Bool("internal") {
if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{
if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
WriteBufferSize: cctx.Int("write-buffer"),
Workers: cctx.Int64("workers"),
NumWorkers: cctx.Int("workers"),
CacheSize: cctx.Int("cache-size"),
IncludeMessages: cctx.Bool("messages"),
IncludeReceipts: cctx.Bool("receipts"),
@ -1248,9 +1253,9 @@ var ChainExportRangeCmd = &cli.Command{
return nil
}
stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{
stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
WriteBufferSize: cctx.Int("write-buffer"),
Workers: cctx.Int64("workers"),
NumWorkers: cctx.Int("workers"),
CacheSize: cctx.Int("cache-size"),
IncludeMessages: cctx.Bool("messages"),
IncludeReceipts: cctx.Bool("receipts"),

View File

@ -9,6 +9,7 @@ import (
"io"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
@ -592,7 +593,7 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M
return cm.VMMessage(), nil
}
func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error {
func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error {
headTs, err := a.Chain.GetTipSetFromKey(ctx, head)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", head, err)
@ -601,10 +602,18 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", tail, err)
}
f, err := os.Create(fmt.Sprintf("./snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix()))
fileName := fmt.Sprintf("./snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix())
absFileName, err := filepath.Abs(fileName)
if err != nil {
return err
}
f, err := os.Create(fileName)
if err != nil {
return err
}
log.Infow("Exporting chain range", "path", absFileName)
// buffer writes to the chain export file.
bw := bufio.NewWriterSize(f, cfg.WriteBufferSize)
@ -617,14 +626,20 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types
}
}()
if err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw); err != nil {
if err := a.Chain.ExportRange(ctx,
bw,
headTs, tailTs,
cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots,
cfg.NumWorkers, cfg.CacheSize,
); err != nil {
return fmt.Errorf("exporting chain range: %w", err)
}
// FIXME: return progress.
return nil
}
func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) {
func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) {
headTs, err := a.Chain.GetTipSetFromKey(ctx, head)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", head, err)
@ -637,8 +652,14 @@ func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetK
out := make(chan []byte)
go func() {
bw := bufio.NewWriterSize(w, cfg.WriteBufferSize)
err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw)
err := a.Chain.ExportRange(
ctx,
bw,
headTs,
tailTs,
cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots,
cfg.NumWorkers, cfg.CacheSize,
)
bw.Flush() //nolint:errcheck // it is a write to a pipe
w.CloseWithError(err) //nolint:errcheck // it is a pipe
}()