Chain export: record epoch for every task.

This commit is contained in:
Hector Sanjuan 2023-03-16 15:31:58 +01:00
parent a3900caeb0
commit 4b8442d8b9

View File

@ -171,6 +171,7 @@ type walkTask struct {
taskType walkSchedTaskType
topLevelTaskType walkSchedTaskType
topLevelTaskCid cid.Cid
epoch abi.ChainEpoch
}
// an ever growing FIFO
@ -323,6 +324,7 @@ func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSche
taskType: blockTask,
topLevelTaskType: blockTask,
topLevelTaskCid: b.Cid(),
epoch: cfg.head.Height(),
}:
}
}
@ -421,7 +423,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
blk, err := s.store.Get(s.ctx, t.c)
if err != nil {
return xerrors.Errorf("writing object to car. Task: %s. Top-Level: %s (%s). bs.Get: %w", t.taskType, t.topLevelTaskType, t.topLevelTaskCid, err)
return xerrors.Errorf("writing object to car. Task: %s. Top-Level: %s (%s). Epoch: %d. bs.Get: %w", t.taskType, t.topLevelTaskType, t.topLevelTaskCid, t.epoch, err)
}
s.results <- taskResult{
@ -446,6 +448,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: dagTask,
topLevelTaskType: blockTask,
topLevelTaskCid: t.c,
epoch: 0,
})
}
s.enqueueIfNew(walkTask{
@ -453,6 +456,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: stateTask,
topLevelTaskType: stateTask,
topLevelTaskCid: t.c,
epoch: 0,
})
return s.sendFinish(workerN)
@ -464,6 +468,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: blockTask,
topLevelTaskType: blockTask,
topLevelTaskCid: t.c,
epoch: b.Height,
})
}
if s.cfg.tail.Height() >= b.Height {
@ -478,6 +483,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: messageTask,
topLevelTaskType: messageTask,
topLevelTaskCid: t.c,
epoch: b.Height,
})
}
if s.cfg.includeReceipts {
@ -487,6 +493,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: receiptTask,
topLevelTaskType: receiptTask,
topLevelTaskCid: t.c,
epoch: b.Height,
})
}
if s.cfg.includeState {
@ -495,6 +502,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: stateTask,
topLevelTaskType: stateTask,
topLevelTaskCid: t.c,
epoch: b.Height,
})
}
@ -512,6 +520,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
taskType: dagTask,
topLevelTaskType: t.topLevelTaskType,
topLevelTaskCid: t.topLevelTaskCid,
epoch: t.epoch,
})
})
}