Merge pull request #10145 from filecoin-project/hsanjuan/chain-export-range-rebased
perf: chain: export-range
This commit is contained in:
commit
abaa53c6a3
@ -173,6 +173,16 @@ type FullNode interface {
|
||||
// If oldmsgskip is set, messages from before the requested roots are also not included.
|
||||
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
|
||||
|
||||
// ChainExportRangeInternal triggers the export of a chain
|
||||
// CAR-snapshot directly to disk. It is similar to ChainExport,
|
||||
// except, depending on options, the snapshot can include receipts,
|
||||
// messages and stateroots for the length between the specified head
|
||||
// and tail, thus producing "archival-grade" snapshots that include
|
||||
// all the on-chain data. The header chain is included back to
|
||||
// genesis and these snapshots can be used to initialize Filecoin
|
||||
// nodes.
|
||||
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin
|
||||
|
||||
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
// are using the splitstore
|
||||
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin
|
||||
|
@ -155,6 +155,20 @@ func (mr *MockFullNodeMockRecorder) ChainExport(arg0, arg1, arg2, arg3 interface
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExport", reflect.TypeOf((*MockFullNode)(nil).ChainExport), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// ChainExportRangeInternal mocks base method.
|
||||
func (m *MockFullNode) ChainExportRangeInternal(arg0 context.Context, arg1, arg2 types.TipSetKey, arg3 api.ChainExportConfig) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainExportRangeInternal", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ChainExportRangeInternal indicates an expected call of ChainExportRangeInternal.
|
||||
func (mr *MockFullNodeMockRecorder) ChainExportRangeInternal(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExportRangeInternal", reflect.TypeOf((*MockFullNode)(nil).ChainExportRangeInternal), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// ChainGetBlock mocks base method.
|
||||
func (m *MockFullNode) ChainGetBlock(arg0 context.Context, arg1 cid.Cid) (*types.BlockHeader, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -140,6 +140,8 @@ type FullNodeMethods struct {
|
||||
|
||||
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
|
||||
|
||||
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error `perm:"admin"`
|
||||
|
||||
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||
|
||||
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `perm:"read"`
|
||||
@ -1437,6 +1439,17 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error {
|
||||
if s.Internal.ChainExportRangeInternal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
|
||||
if s.Internal.ChainGetBlock == nil {
|
||||
return nil, ErrNotSupported
|
||||
|
@ -398,3 +398,12 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ChainExportConfig holds configuration for chain ranged exports.
|
||||
type ChainExportConfig struct {
|
||||
WriteBufferSize int
|
||||
NumWorkers int
|
||||
IncludeMessages bool
|
||||
IncludeReceipts bool
|
||||
IncludeStateRoots bool
|
||||
}
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -3,7 +3,10 @@ package store
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
blocks "github.com/ipfs/go-libipfs/blocks"
|
||||
@ -12,6 +15,8 @@ import (
|
||||
carv2 "github.com/ipld/go-car/v2"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -132,6 +137,423 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
||||
return root, nil
|
||||
}
|
||||
|
||||
type walkSchedTaskType int
|
||||
|
||||
const (
|
||||
finishTask walkSchedTaskType = -1
|
||||
blockTask walkSchedTaskType = iota
|
||||
messageTask
|
||||
receiptTask
|
||||
stateTask
|
||||
dagTask
|
||||
)
|
||||
|
||||
func (t walkSchedTaskType) String() string {
|
||||
switch t {
|
||||
case finishTask:
|
||||
return "finish"
|
||||
case blockTask:
|
||||
return "block"
|
||||
case messageTask:
|
||||
return "message"
|
||||
case receiptTask:
|
||||
return "receipt"
|
||||
case stateTask:
|
||||
return "state"
|
||||
case dagTask:
|
||||
return "dag"
|
||||
}
|
||||
panic(fmt.Sprintf("unknow task %d", t))
|
||||
}
|
||||
|
||||
type walkTask struct {
|
||||
c cid.Cid
|
||||
taskType walkSchedTaskType
|
||||
}
|
||||
|
||||
// an ever growing FIFO
|
||||
type taskFifo struct {
|
||||
in chan walkTask
|
||||
out chan walkTask
|
||||
fifo []walkTask
|
||||
}
|
||||
|
||||
type taskResult struct {
|
||||
c cid.Cid
|
||||
b blocks.Block
|
||||
}
|
||||
|
||||
func newTaskFifo(bufferLen int) *taskFifo {
|
||||
f := taskFifo{
|
||||
in: make(chan walkTask, bufferLen),
|
||||
out: make(chan walkTask, bufferLen),
|
||||
fifo: make([]walkTask, 0),
|
||||
}
|
||||
|
||||
go f.run()
|
||||
|
||||
return &f
|
||||
}
|
||||
|
||||
func (f *taskFifo) Close() error {
|
||||
close(f.in)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *taskFifo) run() {
|
||||
for {
|
||||
if len(f.fifo) > 0 {
|
||||
// we have items in slice
|
||||
// try to put next out or read something in.
|
||||
// blocks if nothing works.
|
||||
next := f.fifo[0]
|
||||
select {
|
||||
case f.out <- next:
|
||||
f.fifo = f.fifo[1:]
|
||||
case elem, ok := <-f.in:
|
||||
if !ok {
|
||||
// drain and close out.
|
||||
for _, elem := range f.fifo {
|
||||
f.out <- elem
|
||||
}
|
||||
close(f.out)
|
||||
return
|
||||
}
|
||||
f.fifo = append(f.fifo, elem)
|
||||
}
|
||||
} else {
|
||||
// no elements in fifo to put out.
|
||||
// Try to read in and block.
|
||||
// When done, try to put out or add to fifo.
|
||||
select {
|
||||
case elem, ok := <-f.in:
|
||||
if !ok {
|
||||
close(f.out)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case f.out <- elem:
|
||||
default:
|
||||
f.fifo = append(f.fifo, elem)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type walkSchedulerConfig struct {
|
||||
numWorkers int
|
||||
|
||||
head *types.TipSet // Tipset to start walking from.
|
||||
tail *types.TipSet // Tipset to end at.
|
||||
includeMessages bool
|
||||
includeReceipts bool
|
||||
includeState bool
|
||||
}
|
||||
|
||||
type walkScheduler struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
store bstore.Blockstore
|
||||
cfg walkSchedulerConfig
|
||||
writer io.Writer
|
||||
|
||||
workerTasks *taskFifo
|
||||
totalTasks atomic.Int64
|
||||
results chan taskResult
|
||||
writeErrorChan chan error
|
||||
|
||||
// tracks number of inflight tasks
|
||||
//taskWg sync.WaitGroup
|
||||
// launches workers and collects errors if any occur
|
||||
workers *errgroup.Group
|
||||
// set of CIDs already exported
|
||||
seen sync.Map
|
||||
}
|
||||
|
||||
func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSchedulerConfig, w io.Writer) (*walkScheduler, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
workers, ctx := errgroup.WithContext(ctx)
|
||||
s := &walkScheduler{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
store: store,
|
||||
cfg: cfg,
|
||||
writer: w,
|
||||
results: make(chan taskResult, cfg.numWorkers*64),
|
||||
workerTasks: newTaskFifo(cfg.numWorkers * 64),
|
||||
writeErrorChan: make(chan error, 1),
|
||||
workers: workers,
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(s.writeErrorChan)
|
||||
for r := range s.results {
|
||||
// Write
|
||||
if err := carutil.LdWrite(s.writer, r.c.Bytes(), r.b.RawData()); err != nil {
|
||||
// abort operations
|
||||
cancel()
|
||||
s.writeErrorChan <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// workers
|
||||
for i := 0; i < cfg.numWorkers; i++ {
|
||||
f := func(n int) func() error {
|
||||
return func() error {
|
||||
return s.workerFunc(n)
|
||||
}
|
||||
}(i)
|
||||
s.workers.Go(f)
|
||||
}
|
||||
|
||||
s.totalTasks.Add(int64(len(cfg.head.Blocks())))
|
||||
for _, b := range cfg.head.Blocks() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Errorw("context done while sending root tasks", ctx.Err())
|
||||
cancel() // kill workers
|
||||
return nil, ctx.Err()
|
||||
case s.workerTasks.in <- walkTask{
|
||||
c: b.Cid(),
|
||||
taskType: blockTask,
|
||||
}:
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *walkScheduler) Wait() error {
|
||||
err := s.workers.Wait()
|
||||
// all workers done. One would have reached genesis and notified the
|
||||
// rest to exit. Yet, there might be some pending tasks in the queue,
|
||||
// so we need to run a "single worker".
|
||||
if err != nil {
|
||||
log.Errorw("export workers finished with error", "error", err)
|
||||
}
|
||||
|
||||
for {
|
||||
if n := s.totalTasks.Load(); n == 0 {
|
||||
break // finally fully done
|
||||
}
|
||||
select {
|
||||
case task := <-s.workerTasks.out:
|
||||
s.totalTasks.Add(-1)
|
||||
if err != nil {
|
||||
continue // just drain if errors happened.
|
||||
}
|
||||
err = s.processTask(task, 0)
|
||||
}
|
||||
}
|
||||
close(s.results)
|
||||
errWrite := <-s.writeErrorChan
|
||||
if errWrite != nil {
|
||||
log.Errorw("error writing to CAR file", "error", err)
|
||||
return errWrite
|
||||
}
|
||||
s.workerTasks.Close() //nolint:errcheck
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *walkScheduler) enqueueIfNew(task walkTask) {
|
||||
if task.c.Prefix().MhType == mh.IDENTITY {
|
||||
//log.Infow("ignored", "cid", todo.c.String())
|
||||
return
|
||||
}
|
||||
if task.c.Prefix().Codec != cid.Raw && task.c.Prefix().Codec != cid.DagCBOR {
|
||||
//log.Infow("ignored", "cid", todo.c.String())
|
||||
return
|
||||
}
|
||||
if _, loaded := s.seen.LoadOrStore(task.c, struct{}{}); loaded {
|
||||
// we already had it on the map
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String())
|
||||
s.totalTasks.Add(1)
|
||||
s.workerTasks.in <- task
|
||||
}
|
||||
|
||||
func (s *walkScheduler) sendFinish(workerN int) error {
|
||||
log.Infow("worker finished work", "worker", workerN)
|
||||
s.totalTasks.Add(1)
|
||||
s.workerTasks.in <- walkTask{
|
||||
taskType: finishTask,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *walkScheduler) workerFunc(workerN int) error {
|
||||
log.Infow("starting worker", "worker", workerN)
|
||||
for t := range s.workerTasks.out {
|
||||
s.totalTasks.Add(-1)
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return s.ctx.Err()
|
||||
default:
|
||||
// A worker reached genesis, so we wind down and let others do
|
||||
// the same. Exit.
|
||||
if t.taskType == finishTask {
|
||||
return s.sendFinish(workerN)
|
||||
}
|
||||
}
|
||||
|
||||
err := s.processTask(t, workerN)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
||||
if t.taskType == finishTask {
|
||||
return nil
|
||||
}
|
||||
|
||||
blk, err := s.store.Get(s.ctx, t.c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
||||
}
|
||||
|
||||
s.results <- taskResult{
|
||||
c: t.c,
|
||||
b: blk,
|
||||
}
|
||||
|
||||
// extract relevant dags to walk from the block
|
||||
if t.taskType == blockTask {
|
||||
blk := t.c
|
||||
data, err := s.store.Get(s.ctx, blk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var b types.BlockHeader
|
||||
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
|
||||
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
|
||||
}
|
||||
if b.Height%1_000 == 0 {
|
||||
log.Infow("block export", "height", b.Height)
|
||||
}
|
||||
if b.Height == 0 {
|
||||
log.Info("exporting genesis block")
|
||||
for i := range b.Parents {
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.Parents[i],
|
||||
taskType: dagTask,
|
||||
})
|
||||
}
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.ParentStateRoot,
|
||||
taskType: stateTask,
|
||||
})
|
||||
|
||||
return s.sendFinish(workerN)
|
||||
}
|
||||
// enqueue block parents
|
||||
for i := range b.Parents {
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.Parents[i],
|
||||
taskType: blockTask,
|
||||
})
|
||||
}
|
||||
if s.cfg.tail.Height() >= b.Height {
|
||||
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.cfg.includeMessages {
|
||||
// enqueue block messages
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.Messages,
|
||||
taskType: messageTask,
|
||||
})
|
||||
}
|
||||
if s.cfg.includeReceipts {
|
||||
// enqueue block receipts
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.ParentMessageReceipts,
|
||||
taskType: receiptTask,
|
||||
})
|
||||
}
|
||||
if s.cfg.includeState {
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: b.ParentStateRoot,
|
||||
taskType: stateTask,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Not a chain-block: we scan for CIDs in the raw block-data
|
||||
return cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
|
||||
if t.c.Prefix().Codec != cid.DagCBOR || t.c.Prefix().MhType == mh.IDENTITY {
|
||||
return
|
||||
}
|
||||
|
||||
s.enqueueIfNew(walkTask{
|
||||
c: c,
|
||||
taskType: dagTask,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *ChainStore) ExportRange(
|
||||
ctx context.Context,
|
||||
w io.Writer,
|
||||
head, tail *types.TipSet,
|
||||
messages, receipts, stateroots bool,
|
||||
workers int) error {
|
||||
|
||||
h := &car.CarHeader{
|
||||
Roots: head.Cids(),
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
if err := car.WriteHeader(h, w); err != nil {
|
||||
return xerrors.Errorf("failed to write car header: %s", err)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
log.Infow("walking snapshot range",
|
||||
"head", head.Key(),
|
||||
"tail", tail.Key(),
|
||||
"messages", messages,
|
||||
"receipts", receipts,
|
||||
"stateroots",
|
||||
stateroots,
|
||||
"workers", workers)
|
||||
|
||||
cfg := walkSchedulerConfig{
|
||||
numWorkers: workers,
|
||||
head: head,
|
||||
tail: tail,
|
||||
includeMessages: messages,
|
||||
includeState: stateroots,
|
||||
includeReceipts: receipts,
|
||||
}
|
||||
|
||||
pw, err := newWalkScheduler(ctx, cs.UnionStore(), cfg, w)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait until all workers are done.
|
||||
err = pw.Wait()
|
||||
if err != nil {
|
||||
log.Errorw("walker scheduler", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
|
||||
if ts == nil {
|
||||
ts = cs.GetHeaviestTipSet()
|
||||
|
105
cli/chain.go
105
cli/chain.go
@ -6,6 +6,7 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -56,6 +57,7 @@ var ChainCmd = &cli.Command{
|
||||
ChainGetCmd,
|
||||
ChainBisectCmd,
|
||||
ChainExportCmd,
|
||||
ChainExportRangeCmd,
|
||||
SlashConsensusFault,
|
||||
ChainGasPriceCmd,
|
||||
ChainInspectUsage,
|
||||
@ -1145,6 +1147,109 @@ var ChainExportCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var ChainExportRangeCmd = &cli.Command{
|
||||
Name: "export-range",
|
||||
Usage: "export chain to a car file",
|
||||
ArgsUsage: "",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "head",
|
||||
Usage: "specify tipset to start the export from (higher epoch)",
|
||||
Value: "@head",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "tail",
|
||||
Usage: "specify tipset to end the export at (lower epoch)",
|
||||
Value: "@tail",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "messages",
|
||||
Usage: "specify if messages should be include",
|
||||
Value: false,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "receipts",
|
||||
Usage: "specify if receipts should be include",
|
||||
Value: false,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "stateroots",
|
||||
Usage: "specify if stateroots should be include",
|
||||
Value: false,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "workers",
|
||||
Usage: "specify the number of workers",
|
||||
Value: 1,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "write-buffer",
|
||||
Usage: "specify write buffer size",
|
||||
Value: 1 << 20,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "internal",
|
||||
Usage: "write the file locally to disk",
|
||||
Value: true,
|
||||
Hidden: true, // currently, non-internal export is not implemented.
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := GetFullNodeAPIV1(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := ReqContext(cctx)
|
||||
|
||||
var head, tail *types.TipSet
|
||||
headstr := cctx.String("head")
|
||||
if headstr == "@head" {
|
||||
head, err = api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
head, err = ParseTipSetRef(ctx, api, headstr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing head: %w", err)
|
||||
}
|
||||
}
|
||||
tailstr := cctx.String("tail")
|
||||
if tailstr == "@tail" {
|
||||
tail, err = api.ChainGetGenesis(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
tail, err = ParseTipSetRef(ctx, api, tailstr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing tail: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if head.Height() < tail.Height() {
|
||||
return errors.New("Height of --head tipset must be greater or equal to the height of the --tail tipset")
|
||||
}
|
||||
|
||||
if !cctx.Bool("internal") {
|
||||
return errors.New("Non-internal exports are not implemented")
|
||||
}
|
||||
|
||||
err = api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
|
||||
WriteBufferSize: cctx.Int("write-buffer"),
|
||||
NumWorkers: cctx.Int("workers"),
|
||||
IncludeMessages: cctx.Bool("messages"),
|
||||
IncludeReceipts: cctx.Bool("receipts"),
|
||||
IncludeStateRoots: cctx.Bool("stateroots"),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var SlashConsensusFault = &cli.Command{
|
||||
Name: "slash-consensus",
|
||||
Usage: "Report consensus fault",
|
||||
|
@ -13,6 +13,7 @@
|
||||
* [ChainCheckBlockstore](#ChainCheckBlockstore)
|
||||
* [ChainDeleteObj](#ChainDeleteObj)
|
||||
* [ChainExport](#ChainExport)
|
||||
* [ChainExportRangeInternal](#ChainExportRangeInternal)
|
||||
* [ChainGetBlock](#ChainGetBlock)
|
||||
* [ChainGetBlockMessages](#ChainGetBlockMessages)
|
||||
* [ChainGetEvents](#ChainGetEvents)
|
||||
@ -473,6 +474,50 @@ Inputs:
|
||||
|
||||
Response: `"Ynl0ZSBhcnJheQ=="`
|
||||
|
||||
### ChainExportRangeInternal
|
||||
ChainExportRangeInternal triggers the export of a chain
|
||||
CAR-snapshot directly to disk. It is similar to ChainExport,
|
||||
except, depending on options, the snapshot can include receipts,
|
||||
messages and stateroots for the length between the specified head
|
||||
and tail, thus producing "archival-grade" snapshots that include
|
||||
all the on-chain data. The header chain is included back to
|
||||
genesis and these snapshots can be used to initialize Filecoin
|
||||
nodes.
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
[
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
|
||||
}
|
||||
],
|
||||
{
|
||||
"WriteBufferSize": 123,
|
||||
"NumWorkers": 123,
|
||||
"IncludeMessages": true,
|
||||
"IncludeReceipts": true,
|
||||
"IncludeStateRoots": true
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ChainGetBlock
|
||||
ChainGetBlock returns the block specified by the given CID.
|
||||
|
||||
|
@ -2110,6 +2110,7 @@ COMMANDS:
|
||||
get Get chain DAG node by path
|
||||
bisect bisect chain for an event
|
||||
export export chain to a car file
|
||||
export-range export chain to a car file
|
||||
slash-consensus Report consensus fault
|
||||
gas-price Estimate gas prices
|
||||
inspect-usage Inspect block space usage of a given tipset
|
||||
@ -2291,6 +2292,25 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
### lotus chain export-range
|
||||
```
|
||||
NAME:
|
||||
lotus chain export-range - export chain to a car file
|
||||
|
||||
USAGE:
|
||||
lotus chain export-range [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--head value specify tipset to start the export from (higher epoch) (default: "@head")
|
||||
--messages specify if messages should be include (default: false)
|
||||
--receipts specify if receipts should be include (default: false)
|
||||
--stateroots specify if stateroots should be include (default: false)
|
||||
--tail value specify tipset to end the export at (lower epoch) (default: "@tail")
|
||||
--workers value specify the number of workers (default: 1)
|
||||
--write-buffer value specify write buffer size (default: 1048576)
|
||||
|
||||
```
|
||||
|
||||
### lotus chain slash-consensus
|
||||
```
|
||||
NAME:
|
||||
|
2
go.mod
2
go.mod
@ -153,6 +153,7 @@ require (
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.33.0
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
|
||||
go.opentelemetry.io/otel/sdk v1.11.1
|
||||
go.uber.org/atomic v1.10.0
|
||||
go.uber.org/fx v1.15.0
|
||||
go.uber.org/multierr v1.8.0
|
||||
go.uber.org/zap v1.23.0
|
||||
@ -334,7 +335,6 @@ require (
|
||||
go.opentelemetry.io/otel/metric v0.33.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.11.1 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/dig v1.12.0 // indirect
|
||||
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
|
@ -5,11 +5,15 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -38,6 +42,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/lib/oldpath"
|
||||
"github.com/filecoin-project/lotus/lib/oldpath/oldresolver"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
var log = logging.Logger("fullnode")
|
||||
@ -89,6 +94,8 @@ type ChainAPI struct {
|
||||
|
||||
// BaseBlockstore is the underlying blockstore
|
||||
BaseBlockstore dtypes.BaseBlockstore
|
||||
|
||||
Repo repo.LockedRepo
|
||||
}
|
||||
|
||||
func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
|
||||
@ -589,6 +596,54 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M
|
||||
return cm.VMMessage(), nil
|
||||
}
|
||||
|
||||
func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error {
|
||||
headTs, err := a.Chain.GetTipSetFromKey(ctx, head)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading tipset %s: %w", head, err)
|
||||
}
|
||||
tailTs, err := a.Chain.GetTipSetFromKey(ctx, tail)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading tipset %s: %w", tail, err)
|
||||
}
|
||||
if headTs.Height() < tailTs.Height() {
|
||||
return xerrors.Errorf("Height of head-tipset (%d) must be greater or equal to the height of the tail-tipset (%d)", headTs.Height(), tailTs.Height())
|
||||
}
|
||||
|
||||
fileName := filepath.Join(a.Repo.Path(), fmt.Sprintf("snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.Create(fileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infow("Exporting chain range", "path", fileName)
|
||||
// buffer writes to the chain export file.
|
||||
bw := bufio.NewWriterSize(f, cfg.WriteBufferSize)
|
||||
|
||||
defer func() {
|
||||
if err := bw.Flush(); err != nil {
|
||||
log.Errorw("failed to flush buffer", "error", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Errorw("failed to close file", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := a.Chain.ExportRange(ctx,
|
||||
bw,
|
||||
headTs, tailTs,
|
||||
cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots,
|
||||
cfg.NumWorkers,
|
||||
); err != nil {
|
||||
return fmt.Errorf("exporting chain range: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) {
|
||||
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user