lotus/chain/store/snapshot.go
Hector Sanjuan 1bb698619c Ranged-export: Remove CachingBlockstore
The improvements in the range-export code lead to avoid reading most blocks
twice, as well as to allowing some blocks to be written to disk multiple times.

The cache hit-rate went down from being close to 50% to a maximum of 12% at
the very end of the export. The reason is that most CIDs are never read twice
since they are correctly tracked in the CID set.

These numbers do not support the maintenance of the CachingBlockstore
code. Additional testing shows that removing it has similar memory-usage
behaviour and about 5 minute-faster execution (around 10%).

Less code to maintain and less options to mess up with.
2023-02-14 21:08:10 +01:00

709 lines
16 KiB
Go

package store
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-libipfs/blocks"
"github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
mh "github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/types"
)
const TipsetkeyBackfillRange = 2 * build.Finality
func (cs *ChainStore) UnionStore() bstore.Blockstore {
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
}
if err := car.WriteHeader(h, w); err != nil {
return xerrors.Errorf("failed to write car header: %s", err)
}
unionBs := cs.UnionStore()
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
blk, err := unionBs.Get(ctx, c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}
return nil
})
}
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
// TODO: writing only to the state blockstore is incorrect.
// At this time, both the state and chain blockstores are backed by the
// universal store. When we physically segregate the stores, we will need
// to route state objects to the state blockstore, and chain objects to
// the chain blockstore.
br, err := carv2.NewBlockReader(r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
}
s := cs.StateBlockstore()
parallelPuts := 5
putThrottle := make(chan error, parallelPuts)
for i := 0; i < parallelPuts; i++ {
putThrottle <- nil
}
var buf []blocks.Block
for {
blk, err := br.Next()
if err != nil {
if err == io.EOF {
if len(buf) > 0 {
if err := s.PutMany(ctx, buf); err != nil {
return nil, err
}
}
break
}
return nil, err
}
buf = append(buf, blk)
if len(buf) > 1000 {
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
return nil, lastErr
}
go func(buf []blocks.Block) {
putThrottle <- s.PutMany(ctx, buf)
}(buf)
buf = nil
}
}
// check errors
for i := 0; i < parallelPuts; i++ {
if lastErr := <-putThrottle; lastErr != nil {
return nil, lastErr
}
}
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
if err != nil {
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
}
ts := root
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
err = cs.PersistTipset(ctx, ts)
if err != nil {
return nil, err
}
parentTsKey := ts.Parents()
ts, err = cs.LoadTipSet(ctx, parentTsKey)
if ts == nil || err != nil {
log.Warnf("Only able to load the last %d tipsets", i)
break
}
}
return root, nil
}
type walkSchedTaskType int
const (
finishTask walkSchedTaskType = -1
blockTask walkSchedTaskType = iota
messageTask
receiptTask
stateTask
dagTask
)
func (t walkSchedTaskType) String() string {
switch t {
case finishTask:
return "finish"
case blockTask:
return "block"
case messageTask:
return "message"
case receiptTask:
return "receipt"
case stateTask:
return "state"
case dagTask:
return "dag"
}
panic(fmt.Sprintf("unknow task %d", t))
}
type walkTask struct {
c cid.Cid
taskType walkSchedTaskType
}
// an ever growing FIFO
type taskFifo struct {
in chan walkTask
out chan walkTask
fifo []walkTask
}
type taskResult struct {
c cid.Cid
b blocks.Block
}
func newTaskFifo(bufferLen int) *taskFifo {
f := taskFifo{
in: make(chan walkTask, bufferLen),
out: make(chan walkTask, bufferLen),
fifo: make([]walkTask, 0),
}
go f.run()
return &f
}
func (f *taskFifo) Close() error {
close(f.in)
return nil
}
func (f *taskFifo) run() {
for {
if len(f.fifo) > 0 {
// we have items in slice
// try to put next out or read something in.
// blocks if nothing works.
next := f.fifo[0]
select {
case f.out <- next:
f.fifo = f.fifo[1:]
case elem, ok := <-f.in:
if !ok {
// drain and close out.
for _, elem := range f.fifo {
f.out <- elem
}
close(f.out)
return
}
f.fifo = append(f.fifo, elem)
}
} else {
// no elements in fifo to put out.
// Try to read in and block.
// When done, try to put out or add to fifo.
select {
case elem, ok := <-f.in:
if !ok {
close(f.out)
return
}
select {
case f.out <- elem:
default:
f.fifo = append(f.fifo, elem)
}
}
}
}
}
type walkSchedulerConfig struct {
numWorkers int
head *types.TipSet // Tipset to start walking from.
tail *types.TipSet // Tipset to end at.
includeMessages bool
includeReceipts bool
includeState bool
}
type walkScheduler struct {
ctx context.Context
cancel context.CancelFunc
store bstore.Blockstore
cfg walkSchedulerConfig
writer io.Writer
workerTasks *taskFifo
totalTasks atomic.Int64
results chan taskResult
writeErrorChan chan error
// tracks number of inflight tasks
//taskWg sync.WaitGroup
// launches workers and collects errors if any occur
workers *errgroup.Group
// set of CIDs already exported
seen sync.Map
}
func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSchedulerConfig, w io.Writer) (*walkScheduler, error) {
ctx, cancel := context.WithCancel(ctx)
workers, ctx := errgroup.WithContext(ctx)
s := &walkScheduler{
ctx: ctx,
cancel: cancel,
store: store,
cfg: cfg,
writer: w,
results: make(chan taskResult, cfg.numWorkers*64),
workerTasks: newTaskFifo(cfg.numWorkers * 64),
writeErrorChan: make(chan error, 1),
workers: workers,
}
go func() {
defer close(s.writeErrorChan)
for r := range s.results {
// Write
if err := carutil.LdWrite(s.writer, r.c.Bytes(), r.b.RawData()); err != nil {
// abort operations
cancel()
s.writeErrorChan <- err
}
}
}()
// workers
for i := 0; i < cfg.numWorkers; i++ {
f := func(n int) func() error {
return func() error {
return s.workerFunc(n)
}
}(i)
s.workers.Go(f)
}
s.totalTasks.Add(int64(len(cfg.head.Blocks())))
for _, b := range cfg.head.Blocks() {
select {
case <-ctx.Done():
log.Errorw("context done while sending root tasks", ctx.Err())
cancel() // kill workers
return nil, ctx.Err()
case s.workerTasks.in <- walkTask{
c: b.Cid(),
taskType: blockTask,
}:
}
}
return s, nil
}
func (s *walkScheduler) Wait() error {
err := s.workers.Wait()
// all workers done. One would have reached genesis and notified the
// rest to exit. Yet, there might be some pending tasks in the queue,
// so we need to run a "single worker".
if err != nil {
log.Errorw("export workers finished with error", "error", err)
}
for {
if n := s.totalTasks.Load(); n == 0 {
break // finally fully done
}
select {
case task := <-s.workerTasks.out:
s.totalTasks.Add(-1)
if err != nil {
continue // just drain if errors happened.
}
err = s.processTask(task, 0)
}
}
close(s.results)
errWrite := <-s.writeErrorChan
if errWrite != nil {
log.Errorw("error writing to CAR file", "error", err)
return errWrite
}
s.workerTasks.Close() //nolint:errcheck
return err
}
func (s *walkScheduler) enqueueIfNew(task walkTask) {
if task.c.Prefix().MhType == mh.IDENTITY {
//log.Infow("ignored", "cid", todo.c.String())
return
}
if task.c.Prefix().Codec != cid.Raw && task.c.Prefix().Codec != cid.DagCBOR {
//log.Infow("ignored", "cid", todo.c.String())
return
}
if _, loaded := s.seen.LoadOrStore(task.c, struct{}{}); loaded {
// we already had it on the map
return
}
log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String())
s.totalTasks.Add(1)
s.workerTasks.in <- task
}
func (s *walkScheduler) sendFinish(workerN int) error {
log.Infow("worker finished work", "worker", workerN)
s.totalTasks.Add(1)
s.workerTasks.in <- walkTask{
taskType: finishTask,
}
return nil
}
func (s *walkScheduler) workerFunc(workerN int) error {
log.Infow("starting worker", "worker", workerN)
for t := range s.workerTasks.out {
s.totalTasks.Add(-1)
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
// A worker reached genesis, so we wind down and let others do
// the same. Exit.
if t.taskType == finishTask {
return s.sendFinish(workerN)
}
}
err := s.processTask(t, workerN)
if err != nil {
return err
}
// continue
}
return nil
}
func (s *walkScheduler) processTask(t walkTask, workerN int) error {
if t.taskType == finishTask {
return nil
}
blk, err := s.store.Get(s.ctx, t.c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
s.results <- taskResult{
c: t.c,
b: blk,
}
// extract relevant dags to walk from the block
if t.taskType == blockTask {
blk := t.c
data, err := s.store.Get(s.ctx, blk)
if err != nil {
return err
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
}
if b.Height%1_000 == 0 {
log.Infow("block export", "height", b.Height)
}
if b.Height == 0 {
log.Info("exporting genesis block")
for i := range b.Parents {
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: dagTask,
})
}
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: stateTask,
})
return s.sendFinish(workerN)
}
// enqueue block parents
for i := range b.Parents {
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: blockTask,
})
}
if s.cfg.tail.Height() >= b.Height {
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String())
return nil
}
if s.cfg.includeMessages {
// enqueue block messages
s.enqueueIfNew(walkTask{
c: b.Messages,
taskType: messageTask,
})
}
if s.cfg.includeReceipts {
// enqueue block receipts
s.enqueueIfNew(walkTask{
c: b.ParentMessageReceipts,
taskType: receiptTask,
})
}
if s.cfg.includeState {
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: stateTask,
})
}
return nil
}
// Not a chain-block: we scan for CIDs in the raw block-data
return cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
if t.c.Prefix().Codec != cid.DagCBOR || t.c.Prefix().MhType == mh.IDENTITY {
return
}
s.enqueueIfNew(walkTask{
c: c,
taskType: dagTask,
})
})
}
func (cs *ChainStore) ExportRange(
ctx context.Context,
w io.Writer,
head, tail *types.TipSet,
messages, receipts, stateroots bool,
workers int) error {
h := &car.CarHeader{
Roots: head.Cids(),
Version: 1,
}
if err := car.WriteHeader(h, w); err != nil {
return xerrors.Errorf("failed to write car header: %s", err)
}
start := time.Now()
log.Infow("walking snapshot range",
"head", head.Key(),
"tail", tail.Key(),
"messages", messages,
"receipts", receipts,
"stateroots",
stateroots,
"workers", workers)
cfg := walkSchedulerConfig{
numWorkers: workers,
head: head,
tail: tail,
includeMessages: messages,
includeState: stateroots,
includeReceipts: receipts,
}
pw, err := newWalkScheduler(ctx, cs.UnionStore(), cfg, w)
if err != nil {
return err
}
// wait until all workers are done.
err = pw.Wait()
if err != nil {
log.Errorw("walker scheduler", "error", err)
return err
}
log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil)
return nil
}
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
seen := cid.NewSet()
walked := cid.NewSet()
blocksToWalk := ts.Cids()
currentMinHeight := ts.Height()
walkChain := func(blk cid.Cid) error {
if !seen.Visit(blk) {
return nil
}
if err := cb(blk); err != nil {
return err
}
data, err := cs.chainBlockstore.Get(ctx, blk)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
}
if currentMinHeight > b.Height {
currentMinHeight = b.Height
if currentMinHeight%builtin.EpochsInDay == 0 {
log.Infow("export", "height", currentMinHeight)
}
}
var cids []cid.Cid
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
if walked.Visit(b.Messages) {
mcids, err := recurseLinks(ctx, cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
}
cids = mcids
}
}
if b.Height > 0 {
for _, p := range b.Parents {
blocksToWalk = append(blocksToWalk, p)
}
} else {
// include the genesis block
cids = append(cids, b.Parents...)
}
out := cids
if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
if walked.Visit(b.ParentStateRoot) {
cids, err := recurseLinks(ctx, cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil {
return xerrors.Errorf("recursing genesis state failed: %w", err)
}
out = append(out, cids...)
}
if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) {
out = append(out, b.ParentMessageReceipts)
}
}
for _, c := range out {
if seen.Visit(c) {
prefix := c.Prefix()
// Don't include identity CIDs.
if prefix.MhType == mh.IDENTITY {
continue
}
// We only include raw and dagcbor, for now.
// Raw for "code" CIDs.
switch prefix.Codec {
case cid.Raw, cid.DagCBOR:
default:
continue
}
if err := cb(c); err != nil {
return err
}
}
}
return nil
}
log.Infow("export started")
exportStart := build.Clock.Now()
for len(blocksToWalk) > 0 {
next := blocksToWalk[0]
blocksToWalk = blocksToWalk[1:]
if err := walkChain(next); err != nil {
return xerrors.Errorf("walk chain failed: %w", err)
}
}
log.Infow("export finished", "duration", build.Clock.Now().Sub(exportStart).Seconds())
return nil
}
func recurseLinks(ctx context.Context, bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
if root.Prefix().Codec != cid.DagCBOR {
return in, nil
}
data, err := bs.Get(ctx, root)
if err != nil {
return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err)
}
var rerr error
err = cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) {
if rerr != nil {
// No error return on ScanForLinks :(
return
}
// traversed this already...
if !walked.Visit(c) {
return
}
in = append(in, c)
var err error
in, err = recurseLinks(ctx, bs, walked, c, in)
if err != nil {
rerr = err
}
})
if err != nil {
return nil, xerrors.Errorf("scanning for links failed: %w", err)
}
return in, rerr
}