Improve logging by carrying some more information in tasks
This commit is contained in:
parent
570773259a
commit
a3900caeb0
@ -169,6 +169,8 @@ func (t walkSchedTaskType) String() string {
|
|||||||
type walkTask struct {
|
type walkTask struct {
|
||||||
c cid.Cid
|
c cid.Cid
|
||||||
taskType walkSchedTaskType
|
taskType walkSchedTaskType
|
||||||
|
topLevelTaskType walkSchedTaskType
|
||||||
|
topLevelTaskCid cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
// an ever growing FIFO
|
// an ever growing FIFO
|
||||||
@ -319,6 +321,8 @@ func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSche
|
|||||||
case s.workerTasks.in <- walkTask{
|
case s.workerTasks.in <- walkTask{
|
||||||
c: b.Cid(),
|
c: b.Cid(),
|
||||||
taskType: blockTask,
|
taskType: blockTask,
|
||||||
|
topLevelTaskType: blockTask,
|
||||||
|
topLevelTaskCid: b.Cid(),
|
||||||
}:
|
}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -417,7 +421,7 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
|
|
||||||
blk, err := s.store.Get(s.ctx, t.c)
|
blk, err := s.store.Get(s.ctx, t.c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
return xerrors.Errorf("writing object to car. Task: %s. Top-Level: %s (%s). bs.Get: %w", t.taskType, t.topLevelTaskType, t.topLevelTaskCid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.results <- taskResult{
|
s.results <- taskResult{
|
||||||
@ -427,13 +431,8 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
|
|
||||||
// extract relevant dags to walk from the block
|
// extract relevant dags to walk from the block
|
||||||
if t.taskType == blockTask {
|
if t.taskType == blockTask {
|
||||||
blk := t.c
|
|
||||||
data, err := s.store.Get(s.ctx, blk)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var b types.BlockHeader
|
var b types.BlockHeader
|
||||||
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
|
if err := b.UnmarshalCBOR(bytes.NewBuffer(blk.RawData())); err != nil {
|
||||||
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
|
return xerrors.Errorf("unmarshalling block header (cid=%s): %w", blk, err)
|
||||||
}
|
}
|
||||||
if b.Height%1_000 == 0 {
|
if b.Height%1_000 == 0 {
|
||||||
@ -445,11 +444,15 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.Parents[i],
|
c: b.Parents[i],
|
||||||
taskType: dagTask,
|
taskType: dagTask,
|
||||||
|
topLevelTaskType: blockTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.ParentStateRoot,
|
c: b.ParentStateRoot,
|
||||||
taskType: stateTask,
|
taskType: stateTask,
|
||||||
|
topLevelTaskType: stateTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
|
|
||||||
return s.sendFinish(workerN)
|
return s.sendFinish(workerN)
|
||||||
@ -459,10 +462,12 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.Parents[i],
|
c: b.Parents[i],
|
||||||
taskType: blockTask,
|
taskType: blockTask,
|
||||||
|
topLevelTaskType: blockTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if s.cfg.tail.Height() >= b.Height {
|
if s.cfg.tail.Height() >= b.Height {
|
||||||
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String())
|
log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", t.c.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -471,6 +476,8 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.Messages,
|
c: b.Messages,
|
||||||
taskType: messageTask,
|
taskType: messageTask,
|
||||||
|
topLevelTaskType: messageTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if s.cfg.includeReceipts {
|
if s.cfg.includeReceipts {
|
||||||
@ -478,12 +485,16 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.ParentMessageReceipts,
|
c: b.ParentMessageReceipts,
|
||||||
taskType: receiptTask,
|
taskType: receiptTask,
|
||||||
|
topLevelTaskType: receiptTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if s.cfg.includeState {
|
if s.cfg.includeState {
|
||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: b.ParentStateRoot,
|
c: b.ParentStateRoot,
|
||||||
taskType: stateTask,
|
taskType: stateTask,
|
||||||
|
topLevelTaskType: stateTask,
|
||||||
|
topLevelTaskCid: t.c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -499,6 +510,8 @@ func (s *walkScheduler) processTask(t walkTask, workerN int) error {
|
|||||||
s.enqueueIfNew(walkTask{
|
s.enqueueIfNew(walkTask{
|
||||||
c: c,
|
c: c,
|
||||||
taskType: dagTask,
|
taskType: dagTask,
|
||||||
|
topLevelTaskType: t.topLevelTaskType,
|
||||||
|
topLevelTaskCid: t.topLevelTaskCid,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user