Merge pull request #10535 from hsanjuan/filcryo-v1.20.0-branch

Fix: export-range: Ignore ipld Blocks not found in Receipts.
This commit is contained in:
Aayush Rajasekaran 2023-03-28 16:01:11 -04:00 committed by GitHub
commit 4b2e74a6df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,12 +3,14 @@ package store
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
blocks "github.com/ipfs/go-libipfs/blocks"
"github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
@ -169,6 +171,9 @@ func (t walkSchedTaskType) String() string {
type walkTask struct {
c cid.Cid
taskType walkSchedTaskType
topLevelTaskType walkSchedTaskType
blockCid cid.Cid
epoch abi.ChainEpoch
}
// an ever growing FIFO
@ -319,6 +324,9 @@ func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSche
case s.workerTasks.in <- walkTask{
c: b.Cid(),
taskType: blockTask,
topLevelTaskType: blockTask,
blockCid: b.Cid(),
epoch: cfg.head.Height(),
}:
}
}
@ -416,8 +424,17 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
}
blk, err := s.store.Get(s.ctx, t.c)
if errors.Is(err, format.ErrNotFound{}) && t.topLevelTaskType == receiptTask {
log.Debugw("ignoring not-found block in Receipts",
"block", t.blockCid,
"epoch", t.epoch,
"cid", t.c)
return nil
}
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
return xerrors.Errorf(
"blockstore.Get(%s). Task: %s. Block: %s (%s). Epoch: %d. Err: %w",
t.c, t.taskType, t.topLevelTaskType, t.blockCid, t.epoch, err)
}
s.results <- taskResult{
@ -427,13 +444,8 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
// extract relevant dags to walk from the block
if t.taskType == blockTask {
blk := t.c
data, err := s.store.Get(s.ctx, blk)
if err != nil {
return err
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
if err := b.UnmarshalCBOR(bytes.NewBuffer(blk.RawData())); err != nil {
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
}
if b.Height%1_000 == 0 {
@ -445,11 +457,17 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: dagTask,
topLevelTaskType: blockTask,
blockCid: b.Parents[i],
epoch: 0,
})
}
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: stateTask,
topLevelTaskType: stateTask,
blockCid: t.c,
epoch: 0,
})
return s.sendFinish(workerN)
@ -459,10 +477,13 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
s.enqueueIfNew(walkTask{
c: b.Parents[i],
taskType: blockTask,
topLevelTaskType: blockTask,
blockCid: b.Parents[i],
epoch: b.Height,
})
}
if s.cfg.tail.Height() >= b.Height {
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String())
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", t.c.String())
return nil
}
@ -471,6 +492,9 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
s.enqueueIfNew(walkTask{
c: b.Messages,
taskType: messageTask,
topLevelTaskType: messageTask,
blockCid: t.c,
epoch: b.Height,
})
}
if s.cfg.includeReceipts {
@ -478,12 +502,18 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
s.enqueueIfNew(walkTask{
c: b.ParentMessageReceipts,
taskType: receiptTask,
topLevelTaskType: receiptTask,
blockCid: t.c,
epoch: b.Height,
})
}
if s.cfg.includeState {
s.enqueueIfNew(walkTask{
c: b.ParentStateRoot,
taskType: stateTask,
topLevelTaskType: stateTask,
blockCid: t.c,
epoch: b.Height,
})
}
@ -499,6 +529,9 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
s.enqueueIfNew(walkTask{
c: c,
taskType: dagTask,
topLevelTaskType: t.topLevelTaskType,
blockCid: t.blockCid,
epoch: t.epoch,
})
})
}