diff --git a/api/api_full.go b/api/api_full.go index 15939d79d..26e04b786 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -173,6 +173,10 @@ type FullNode interface { // If oldmsgskip is set, messages from before the requested roots are also not included. ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read + ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) (<-chan []byte, error) //perm:read + + ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) error //perm:read + // ChainPrune prunes the stored chain state and garbage collects; only supported if you // are using the splitstore ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 41e7529ec..1d7f47bfd 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -155,6 +155,35 @@ func (mr *MockFullNodeMockRecorder) ChainExport(arg0, arg1, arg2, arg3 interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExport", reflect.TypeOf((*MockFullNode)(nil).ChainExport), arg0, arg1, arg2, arg3) } +// ChainExportRange mocks base method. +func (m *MockFullNode) ChainExportRange(arg0 context.Context, arg1, arg2 types.TipSetKey, arg3 *api.ChainExportConfig) (<-chan []byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainExportRange", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(<-chan []byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainExportRange indicates an expected call of ChainExportRange. +func (mr *MockFullNodeMockRecorder) ChainExportRange(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExportRange", reflect.TypeOf((*MockFullNode)(nil).ChainExportRange), arg0, arg1, arg2, arg3) +} + +// ChainExportRangeInternal mocks base method. +func (m *MockFullNode) ChainExportRangeInternal(arg0 context.Context, arg1, arg2 types.TipSetKey, arg3 *api.ChainExportConfig) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainExportRangeInternal", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// ChainExportRangeInternal indicates an expected call of ChainExportRangeInternal. +func (mr *MockFullNodeMockRecorder) ChainExportRangeInternal(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExportRangeInternal", reflect.TypeOf((*MockFullNode)(nil).ChainExportRangeInternal), arg0, arg1, arg2, arg3) +} + // ChainGetBlock mocks base method. func (m *MockFullNode) ChainGetBlock(arg0 context.Context, arg1 cid.Cid) (*types.BlockHeader, error) { m.ctrl.T.Helper() diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 29a4e7131..e55a3e3e5 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -7,15 +7,6 @@ import ( "encoding/json" "time" - "github.com/google/uuid" - "github.com/ipfs/go-cid" - blocks "github.com/ipfs/go-libipfs/blocks" - "github.com/libp2p/go-libp2p/core/metrics" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -32,7 +23,6 @@ import ( "github.com/filecoin-project/go-state-types/dline" abinetwork "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/proof" - apitypes "github.com/filecoin-project/lotus/api/types" builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -45,6 +35,14 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + blocks "github.com/ipfs/go-libipfs/blocks" + "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "golang.org/x/xerrors" ) var ErrNotSupported = xerrors.New("method not supported") @@ -140,6 +138,10 @@ type FullNodeMethods struct { ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"` + ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) `perm:"read"` + + ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error `perm:"read"` + ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `perm:"read"` @@ -1435,6 +1437,28 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo return nil, ErrNotSupported } +func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) { + if s.Internal.ChainExportRange == nil { + return nil, ErrNotSupported + } + return s.Internal.ChainExportRange(p0, p1, p2, p3) +} + +func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) { + return nil, ErrNotSupported +} + +func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error { + if s.Internal.ChainExportRangeInternal == nil { + return ErrNotSupported + } + return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3) +} + +func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error { + return ErrNotSupported +} + func (s *FullNodeStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { if s.Internal.ChainGetBlock == nil { return nil, ErrNotSupported diff --git a/api/types.go b/api/types.go index e67903436..d8495d29e 100644 --- a/api/types.go +++ b/api/types.go @@ -398,3 +398,12 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { } return nil } + +type ChainExportConfig struct { + WriteBufferSize int + Workers int64 + CacheSize int + IncludeMessages bool + IncludeReceipts bool + IncludeStateRoots bool +} diff --git a/api/v0api/full.go b/api/v0api/full.go index 490cd73c8..648b8b12e 100644 --- a/api/v0api/full.go +++ b/api/v0api/full.go @@ -161,6 +161,10 @@ type FullNode interface { // If oldmsgskip is set, messages from before the requested roots are also not included. ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read + ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) //perm:read + + ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error //perm:read + // MethodGroup: Beacon // The Beacon method group contains methods for interacting with the random beacon (DRAND) diff --git a/api/v0api/proxy_gen.go b/api/v0api/proxy_gen.go index 5fa0d949c..ef0154b56 100644 --- a/api/v0api/proxy_gen.go +++ b/api/v0api/proxy_gen.go @@ -5,11 +5,6 @@ package v0api import ( "context" - "github.com/ipfs/go-cid" - blocks "github.com/ipfs/go-libipfs/blocks" - "github.com/libp2p/go-libp2p/core/peer" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -22,7 +17,6 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" abinetwork "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -30,6 +24,10 @@ import ( marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" + "github.com/ipfs/go-cid" + blocks "github.com/ipfs/go-libipfs/blocks" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/xerrors" ) var ErrNotSupported = xerrors.New("method not supported") @@ -49,6 +47,10 @@ type FullNodeMethods struct { ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"` + ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) `perm:"read"` + + ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error `perm:"read"` + ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `perm:"read"` @@ -532,6 +534,28 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo return nil, ErrNotSupported } +func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) { + if s.Internal.ChainExportRange == nil { + return nil, ErrNotSupported + } + return s.Internal.ChainExportRange(p0, p1, p2, p3) +} + +func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) { + return nil, ErrNotSupported +} + +func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error { + if s.Internal.ChainExportRangeInternal == nil { + return ErrNotSupported + } + return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3) +} + +func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error { + return ErrNotSupported +} + func (s *FullNodeStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { if s.Internal.ChainGetBlock == nil { return nil, ErrNotSupported diff --git a/api/v0api/v0mocks/mock_full.go b/api/v0api/v0mocks/mock_full.go index 619f19d35..c99dbae0e 100644 --- a/api/v0api/v0mocks/mock_full.go +++ b/api/v0api/v0mocks/mock_full.go @@ -141,6 +141,35 @@ func (mr *MockFullNodeMockRecorder) ChainExport(arg0, arg1, arg2, arg3 interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExport", reflect.TypeOf((*MockFullNode)(nil).ChainExport), arg0, arg1, arg2, arg3) } +// ChainExportRange mocks base method. +func (m *MockFullNode) ChainExportRange(arg0 context.Context, arg1, arg2 types.TipSetKey, arg3 *api.ChainExportConfig) (<-chan []byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainExportRange", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(<-chan []byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainExportRange indicates an expected call of ChainExportRange. +func (mr *MockFullNodeMockRecorder) ChainExportRange(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExportRange", reflect.TypeOf((*MockFullNode)(nil).ChainExportRange), arg0, arg1, arg2, arg3) +} + +// ChainExportRangeInternal mocks base method. +func (m *MockFullNode) ChainExportRangeInternal(arg0 context.Context, arg1, arg2 types.TipSetKey, arg3 *api.ChainExportConfig) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainExportRangeInternal", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// ChainExportRangeInternal indicates an expected call of ChainExportRangeInternal. +func (mr *MockFullNodeMockRecorder) ChainExportRangeInternal(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainExportRangeInternal", reflect.TypeOf((*MockFullNode)(nil).ChainExportRangeInternal), arg0, arg1, arg2, arg3) +} + // ChainGetBlock mocks base method. func (m *MockFullNode) ChainGetBlock(arg0 context.Context, arg1 cid.Cid) (*types.BlockHeader, error) { m.ctrl.T.Helper() diff --git a/chain/store/cache.go b/chain/store/cache.go new file mode 100644 index 000000000..4106d9556 --- /dev/null +++ b/chain/store/cache.go @@ -0,0 +1,118 @@ +package store + +import ( + "context" + "fmt" + "sync/atomic" + + lru "github.com/hashicorp/golang-lru" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/blockstore" +) + +type CachingBlockstore struct { + cache *lru.ARCCache + blocks blockstore.Blockstore + reads int64 // updated atomically + hits int64 // updated atomically + bytes int64 // updated atomically +} + +func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) { + cache, err := lru.NewARC(cacheSize) + if err != nil { + return nil, fmt.Errorf("new arc: %w", err) + } + + return &CachingBlockstore{ + cache: cache, + blocks: blocks, + }, nil +} + +func (cs *CachingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return cs.blocks.DeleteBlock(ctx, c) +} + +func (cs *CachingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + return cs.blocks.GetSize(ctx, c) +} + +func (cs *CachingBlockstore) Put(ctx context.Context, blk blocks.Block) error { + return cs.blocks.Put(ctx, blk) +} + +func (cs *CachingBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error { + return cs.blocks.PutMany(ctx, blks) +} + +func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return cs.blocks.AllKeysChan(ctx) +} + +func (cs *CachingBlockstore) HashOnRead(enabled bool) { + cs.blocks.HashOnRead(enabled) +} + +func (cs *CachingBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + return cs.blocks.DeleteMany(ctx, cids) +} + +func (cs *CachingBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + reads := atomic.AddInt64(&cs.reads, 1) + if reads%100000 == 0 { + hits := atomic.LoadInt64(&cs.hits) + by := atomic.LoadInt64(&cs.bytes) + log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) + } + + v, hit := cs.cache.Get(c) + if hit { + atomic.AddInt64(&cs.hits, 1) + return v.(blocks.Block), nil + } + + blk, err := cs.blocks.Get(ctx, c) + if err != nil { + return nil, err + } + + atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) + cs.cache.Add(c, blk) + return blk, err +} + +func (cs *CachingBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { + reads := atomic.AddInt64(&cs.reads, 1) + if reads%1000000 == 0 { + hits := atomic.LoadInt64(&cs.hits) + by := atomic.LoadInt64(&cs.bytes) + log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) + } + v, hit := cs.cache.Get(c) + if hit { + atomic.AddInt64(&cs.hits, 1) + return callback(v.(blocks.Block).RawData()) + } + + blk, err := cs.blocks.Get(ctx, c) + if err != nil { + return err + } + + atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) + cs.cache.Add(c, blk) + return callback(blk.RawData()) +} + +func (cs *CachingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + atomic.AddInt64(&cs.reads, 1) + // Safe to query cache since blockstore never deletes + if cs.cache.Contains(c) { + return true, nil + } + + return cs.blocks.Has(ctx, c) +} diff --git a/chain/store/snapshot.go b/chain/store/snapshot.go index 36435152e..eaaf92102 100644 --- a/chain/store/snapshot.go +++ b/chain/store/snapshot.go @@ -3,7 +3,10 @@ package store import ( "bytes" "context" + "fmt" "io" + "sync" + "time" "github.com/ipfs/go-cid" blocks "github.com/ipfs/go-libipfs/blocks" @@ -12,6 +15,7 @@ import ( carv2 "github.com/ipld/go-car/v2" mh "github.com/multiformats/go-multihash" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -28,6 +32,35 @@ func (cs *ChainStore) UnionStore() bstore.Blockstore { return bstore.Union(cs.stateBlockstore, cs.chainBlockstore) } +func (cs *ChainStore) ExportRange(ctx context.Context, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cacheSize int, w io.Writer) error { + h := &car.CarHeader{ + Roots: head.Cids(), + Version: 1, + } + + if err := car.WriteHeader(h, w); err != nil { + return xerrors.Errorf("failed to write car header: %s", err) + } + + cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize) + if err != nil { + return err + } + return cs.WalkSnapshotRange(ctx, cacheStore, head, tail, messages, receipts, stateroots, workers, func(c cid.Cid) error { + blk, err := cacheStore.Get(ctx, c) + if err != nil { + return xerrors.Errorf("writing object to car, bs.Get: %w", err) + } + + if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil { + return xerrors.Errorf("failed to write block to car output: %w", err) + } + + return nil + }) + +} + func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error { h := &car.CarHeader{ Roots: ts.Cids(), @@ -132,6 +165,324 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e return root, nil } +type walkTask struct { + c cid.Cid + taskType taskType +} + +type walkResult struct { + c cid.Cid +} + +type walkSchedulerConfig struct { + numWorkers int64 + tail *types.TipSet + includeMessages bool + includeReceipts bool + includeState bool +} + +func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg *walkSchedulerConfig, rootTasks ...*walkTask) (*walkScheduler, context.Context) { + tailSet := cid.NewSet() + for i := range cfg.tail.Cids() { + tailSet.Add(cfg.tail.Cids()[i]) + } + grp, ctx := errgroup.WithContext(ctx) + s := &walkScheduler{ + store: store, + numWorkers: cfg.numWorkers, + stack: rootTasks, + in: make(chan *walkTask, cfg.numWorkers*64), + out: make(chan *walkTask, cfg.numWorkers*64), + grp: grp, + tail: tailSet, + cfg: cfg, + } + s.taskWg.Add(len(rootTasks)) + return s, ctx +} + +type walkScheduler struct { + store bstore.Blockstore + // number of worker routine to spawn + numWorkers int64 + // buffer holds tasks until they are processed + stack []*walkTask + // inbound and outbound tasks + in, out chan *walkTask + // tracks number of inflight tasks + taskWg sync.WaitGroup + // launches workers and collects errors if any occur + grp *errgroup.Group + // set of tasks seen + seen sync.Map + + tail *cid.Set + cfg *walkSchedulerConfig +} + +func (s *walkScheduler) Wait() error { + return s.grp.Wait() +} + +func (s *walkScheduler) enqueueIfNew(task *walkTask) { + if task.c.Prefix().MhType == mh.IDENTITY { + //log.Infow("ignored", "cid", todo.c.String()) + return + } + if task.c.Prefix().Codec != cid.Raw && task.c.Prefix().Codec != cid.DagCBOR { + //log.Infow("ignored", "cid", todo.c.String()) + return + } + if _, ok := s.seen.Load(task.c); ok { + return + } + log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String()) + s.taskWg.Add(1) + s.seen.Store(task.c, struct{}{}) + s.in <- task +} + +func (s *walkScheduler) startScheduler(ctx context.Context) { + s.grp.Go(func() error { + defer func() { + log.Infow("walkScheduler shutting down") + close(s.out) + + // Because the workers may have exited early (due to the context being canceled). + for range s.out { + s.taskWg.Done() + } + log.Info("closed scheduler out wait group") + + // Because the workers may have enqueued additional tasks. + for range s.in { + s.taskWg.Done() + } + log.Info("closed scheduler in wait group") + + // now, the waitgroup should be at 0, and the goroutine that was _waiting_ on it should have exited. + log.Infow("walkScheduler stopped") + }() + go func() { + s.taskWg.Wait() + close(s.in) + log.Info("closed scheduler in channel") + }() + for { + if n := len(s.stack) - 1; n >= 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case newJob, ok := <-s.in: + if !ok { + return nil + } + s.stack = append(s.stack, newJob) + case s.out <- s.stack[n]: + s.stack[n] = nil + s.stack = s.stack[:n] + } + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case newJob, ok := <-s.in: + if !ok { + return nil + } + s.stack = append(s.stack, newJob) + } + } + } + }) +} + +func (s *walkScheduler) startWorkers(ctx context.Context, out chan *walkResult) { + for i := int64(0); i < s.numWorkers; i++ { + s.grp.Go(func() error { + for task := range s.out { + if err := s.work(ctx, task, out); err != nil { + return err + } + } + return nil + }) + } +} + +type taskType int + +func (t taskType) String() string { + switch t { + case Block: + return "block" + case Message: + return "message" + case Receipt: + return "receipt" + case State: + return "state" + case Dag: + return "dag" + } + panic(fmt.Sprintf("unknow task %d", t)) +} + +const ( + Block taskType = iota + Message + Receipt + State + Dag +) + +func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *walkResult) error { + defer s.taskWg.Done() + // unseen cid, its a result + results <- &walkResult{c: todo.c} + + // extract relevant dags to walk from the block + if todo.taskType == Block { + blk := todo.c + data, err := s.store.Get(ctx, blk) + if err != nil { + return err + } + var b types.BlockHeader + if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil { + return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err) + } + if b.Height%1_000 == 0 { + log.Infow("block export", "height", b.Height) + } + if b.Height == 0 { + log.Info("exporting genesis block") + for i := range b.Parents { + s.enqueueIfNew(&walkTask{ + c: b.Parents[i], + taskType: Dag, + }) + } + s.enqueueIfNew(&walkTask{ + c: b.ParentStateRoot, + taskType: State, + }) + return nil + } + // enqueue block parents + for i := range b.Parents { + s.enqueueIfNew(&walkTask{ + c: b.Parents[i], + taskType: Block, + }) + } + if s.cfg.tail.Height() >= b.Height { + log.Debugw("tail reached", "cid", blk.String()) + return nil + } + + if s.cfg.includeMessages { + // enqueue block messages + s.enqueueIfNew(&walkTask{ + c: b.Messages, + taskType: Message, + }) + } + if s.cfg.includeReceipts { + // enqueue block receipts + s.enqueueIfNew(&walkTask{ + c: b.ParentMessageReceipts, + taskType: Receipt, + }) + } + if s.cfg.includeState { + s.enqueueIfNew(&walkTask{ + c: b.ParentStateRoot, + taskType: State, + }) + } + + return nil + } + data, err := s.store.Get(ctx, todo.c) + if err != nil { + return err + } + return cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) { + if todo.c.Prefix().Codec != cid.DagCBOR || todo.c.Prefix().MhType == mh.IDENTITY { + return + } + + s.enqueueIfNew(&walkTask{ + c: c, + taskType: Dag, + }) + }) +} + +func (cs *ChainStore) WalkSnapshotRange(ctx context.Context, store bstore.Blockstore, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cb func(cid.Cid) error) error { + start := time.Now() + log.Infow("walking snapshot range", "head", head.Key(), "tail", tail.Key(), "messages", messages, "receipts", receipts, "stateroots", stateroots, "workers", workers, "start", start) + var tasks []*walkTask + for i := range head.Blocks() { + tasks = append(tasks, &walkTask{ + c: head.Blocks()[i].Cid(), + taskType: 0, + }) + } + + cfg := &walkSchedulerConfig{ + numWorkers: workers, + tail: tail, + includeMessages: messages, + includeState: stateroots, + includeReceipts: receipts, + } + + pw, ctx := newWalkScheduler(ctx, store, cfg, tasks...) + // create a buffered channel for exported CID's scaled on the number of workers. + results := make(chan *walkResult, workers*64) + + pw.startScheduler(ctx) + // workers accept channel and write results to it. + pw.startWorkers(ctx, results) + + // used to wait until result channel has been drained. + resultsDone := make(chan struct{}) + var cbErr error + go func() { + // signal we are done draining results when this method exits. + defer close(resultsDone) + // drain the results channel until is closes. + for res := range results { + if err := cb(res.c); err != nil { + log.Errorw("export range callback error", "error", err) + cbErr = err + return + } + } + }() + // wait until all workers are done. + err := pw.Wait() + if err != nil { + log.Errorw("walker scheduler", "error", err) + } + // workers are done, close the results channel. + close(results) + // wait until all results have been consumed before exiting (its buffered). + <-resultsDone + + // if there was a callback error return it. + if cbErr != nil { + return cbErr + } + log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil) + + // return any error encountered by the walker. + return err +} + func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error { if ts == nil { ts = cs.GetHeaviestTipSet() diff --git a/cli/chain.go b/cli/chain.go index 549b8646e..e0cc62ac9 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -56,6 +56,7 @@ var ChainCmd = &cli.Command{ ChainGetCmd, ChainBisectCmd, ChainExportCmd, + ChainExportRangeCmd, SlashConsensusFault, ChainGasPriceCmd, ChainInspectUsage, @@ -1145,6 +1146,149 @@ var ChainExportCmd = &cli.Command{ }, } +var ChainExportRangeCmd = &cli.Command{ + Name: "export-range", + Usage: "export chain to a car file", + ArgsUsage: "[outputPath]", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "head", + Usage: "specify tipset to start the export from", + Value: "@head", + }, + &cli.StringFlag{ + Name: "tail", + Usage: "specify tipset to end the export at", + Value: "@tail", + }, + &cli.BoolFlag{ + Name: "messages", + Usage: "specify if messages should be include", + Value: false, + }, + &cli.BoolFlag{ + Name: "receipts", + Usage: "specify if receipts should be include", + Value: false, + }, + &cli.BoolFlag{ + Name: "stateroots", + Usage: "specify if stateroots should be include", + Value: false, + }, + &cli.Int64Flag{ + Name: "workers", + Usage: "specify the number of workers", + Value: 1, + }, + &cli.IntFlag{ + Name: "cache-size", + Usage: "specify the size of the cache (in objects) to use while exporting", + Value: 100_000, + }, + &cli.IntFlag{ + Name: "write-buffer", + Usage: "specify write buffer size", + Value: 1 << 20, + }, + &cli.BoolFlag{ + Name: "internal", + Usage: "will cause the daemon to write the file locally", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + if !cctx.Args().Present() { + return fmt.Errorf("must specify filename to export chain to") + } + + head, tail := &types.TipSet{}, &types.TipSet{} + headstr := cctx.String("head") + if headstr == "@head" { + head, err = api.ChainHead(ctx) + if err != nil { + return err + } + } else { + head, err = ParseTipSetRef(ctx, api, headstr) + if err != nil { + return fmt.Errorf("parsing head: %w", err) + } + } + tailstr := cctx.String("tail") + if tailstr == "@tail" { + tail, err = api.ChainGetGenesis(ctx) + if err != nil { + return err + } + } else { + tail, err = ParseTipSetRef(ctx, api, tailstr) + if err != nil { + return fmt.Errorf("parsing tail: %w", err) + } + } + + if cctx.Bool("internal") { + if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{ + WriteBufferSize: cctx.Int("write-buffer"), + Workers: cctx.Int64("workers"), + CacheSize: cctx.Int("cache-size"), + IncludeMessages: cctx.Bool("messages"), + IncludeReceipts: cctx.Bool("receipts"), + IncludeStateRoots: cctx.Bool("stateroots"), + }); err != nil { + return err + } + return nil + } + + stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{ + WriteBufferSize: cctx.Int("write-buffer"), + Workers: cctx.Int64("workers"), + CacheSize: cctx.Int("cache-size"), + IncludeMessages: cctx.Bool("messages"), + IncludeReceipts: cctx.Bool("receipts"), + IncludeStateRoots: cctx.Bool("stateroots"), + }) + if err != nil { + return err + } + + fi, err := createExportFile(cctx.App, cctx.Args().First()) + if err != nil { + return err + } + defer func() { + err := fi.Close() + if err != nil { + fmt.Printf("error closing output file: %+v", err) + } + }() + + var last bool + for b := range stream { + last = len(b) == 0 + + _, err := fi.Write(b) + if err != nil { + return err + } + } + + if !last { + return xerrors.Errorf("incomplete export (remote connection lost?)") + } + + return nil + }, +} + var SlashConsensusFault = &cli.Command{ Name: "slash-consensus", Usage: "Report consensus fault", diff --git a/documentation/en/api-v0-methods.md b/documentation/en/api-v0-methods.md index fe639b2f3..896c3ca1e 100644 --- a/documentation/en/api-v0-methods.md +++ b/documentation/en/api-v0-methods.md @@ -13,6 +13,8 @@ * [Chain](#Chain) * [ChainDeleteObj](#ChainDeleteObj) * [ChainExport](#ChainExport) + * [ChainExportRange](#ChainExportRange) + * [ChainExportRangeInternal](#ChainExportRangeInternal) * [ChainGetBlock](#ChainGetBlock) * [ChainGetBlockMessages](#ChainGetBlockMessages) * [ChainGetGenesis](#ChainGetGenesis) @@ -421,6 +423,80 @@ Inputs: Response: `"Ynl0ZSBhcnJheQ=="` +### ChainExportRange + + +Perms: read + +Inputs: +```json +[ + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + { + "WriteBufferSize": 123, + "Workers": 9, + "CacheSize": 123, + "IncludeMessages": true, + "IncludeReceipts": true, + "IncludeStateRoots": true + } +] +``` + +Response: `"Ynl0ZSBhcnJheQ=="` + +### ChainExportRangeInternal + + +Perms: read + +Inputs: +```json +[ + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + { + "WriteBufferSize": 123, + "Workers": 9, + "CacheSize": 123, + "IncludeMessages": true, + "IncludeReceipts": true, + "IncludeStateRoots": true + } +] +``` + +Response: `{}` + ### ChainGetBlock ChainGetBlock returns the block specified by the given CID. diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 11466ee1b..37ff77a00 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -13,6 +13,8 @@ * [ChainCheckBlockstore](#ChainCheckBlockstore) * [ChainDeleteObj](#ChainDeleteObj) * [ChainExport](#ChainExport) + * [ChainExportRange](#ChainExportRange) + * [ChainExportRangeInternal](#ChainExportRangeInternal) * [ChainGetBlock](#ChainGetBlock) * [ChainGetBlockMessages](#ChainGetBlockMessages) * [ChainGetEvents](#ChainGetEvents) @@ -472,6 +474,80 @@ Inputs: Response: `"Ynl0ZSBhcnJheQ=="` +### ChainExportRange + + +Perms: read + +Inputs: +```json +[ + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + { + "WriteBufferSize": 123, + "Workers": 9, + "CacheSize": 123, + "IncludeMessages": true, + "IncludeReceipts": true, + "IncludeStateRoots": true + } +] +``` + +Response: `"Ynl0ZSBhcnJheQ=="` + +### ChainExportRangeInternal + + +Perms: read + +Inputs: +```json +[ + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + { + "WriteBufferSize": 123, + "Workers": 9, + "CacheSize": 123, + "IncludeMessages": true, + "IncludeReceipts": true, + "IncludeStateRoots": true + } +] +``` + +Response: `{}` + ### ChainGetBlock ChainGetBlock returns the block specified by the given CID. diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 17444ca58..6d47518ee 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -5,11 +5,14 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "math" + "os" "strconv" "strings" "sync" + "time" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -589,6 +592,91 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M return cm.VMMessage(), nil } +func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error { + headTs, err := a.Chain.GetTipSetFromKey(ctx, head) + if err != nil { + return xerrors.Errorf("loading tipset %s: %w", head, err) + } + tailTs, err := a.Chain.GetTipSetFromKey(ctx, tail) + if err != nil { + return xerrors.Errorf("loading tipset %s: %w", tail, err) + } + f, err := os.Create(fmt.Sprintf("./snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix())) + if err != nil { + return err + } + // buffer writes to the chain export file. + bw := bufio.NewWriterSize(f, cfg.WriteBufferSize) + + defer func() { + if err := bw.Flush(); err != nil { + log.Errorw("failed to flush buffer", "error", err) + } + if err := f.Close(); err != nil { + log.Errorw("failed to close file", "error", err) + } + }() + + if err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw); err != nil { + return fmt.Errorf("exporting chain range: %w", err) + } + + return nil +} + +func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) { + headTs, err := a.Chain.GetTipSetFromKey(ctx, head) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", head, err) + } + tailTs, err := a.Chain.GetTipSetFromKey(ctx, tail) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tail, err) + } + r, w := io.Pipe() + out := make(chan []byte) + go func() { + bw := bufio.NewWriterSize(w, cfg.WriteBufferSize) + + err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw) + bw.Flush() //nolint:errcheck // it is a write to a pipe + w.CloseWithError(err) //nolint:errcheck // it is a pipe + }() + + go func() { + defer close(out) + for { + buf := make([]byte, cfg.WriteBufferSize) + n, err := r.Read(buf) + if err != nil && err != io.EOF { + log.Errorf("chain export pipe read failed: %s", err) + return + } + if n > 0 { + select { + case out <- buf[:n]: + case <-ctx.Done(): + log.Warnf("export writer failed: %s", ctx.Err()) + return + } + } + if err == io.EOF { + // send empty slice to indicate correct eof + select { + case out <- []byte{}: + case <-ctx.Done(): + log.Warnf("export writer failed: %s", ctx.Err()) + return + } + + return + } + } + }() + + return out, nil +} + func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) { ts, err := a.Chain.GetTipSetFromKey(ctx, tsk) if err != nil {