diff --git a/storage/wdpost_journal.go b/storage/wdpost_journal.go new file mode 100644 index 000000000..04bca895c --- /dev/null +++ b/storage/wdpost_journal.go @@ -0,0 +1,74 @@ +package storage + +import ( + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + + "github.com/ipfs/go-cid" +) + +// SchedulerState defines the possible states in which the scheduler could be, +// for the purposes of journalling. +type SchedulerState string + +const ( + // SchedulerStateStarted gets recorded when a WdPoSt cycle for an + // epoch begins. + SchedulerStateStarted = SchedulerState("started") + // SchedulerStateAborted gets recorded when a WdPoSt cycle for an + // epoch is aborted, normally because of a chain reorg or advancement. + SchedulerStateAborted = SchedulerState("aborted") + // SchedulerStateFaulted gets recorded when a WdPoSt cycle for an + // epoch terminates abnormally, in which case the error is also recorded. + SchedulerStateFaulted = SchedulerState("faulted") + // SchedulerStateSucceeded gets recorded when a WdPoSt cycle for an + // epoch ends successfully. + SchedulerStateSucceeded = SchedulerState("succeeded") +) + +// Journal event types. +const ( + evtTypeWdPoStScheduler = iota + evtTypeWdPoStProofs + evtTypeWdPoStRecoveries + evtTypeWdPoStFaults +) + +// evtCommon is a common set of attributes for Windowed PoSt journal events. +type evtCommon struct { + Deadline *miner.DeadlineInfo + Height abi.ChainEpoch + TipSet []cid.Cid + Error error `json:",omitempty"` +} + +// WdPoStSchedulerEvt is the journal event that gets recorded on scheduler +// actions. +type WdPoStSchedulerEvt struct { + evtCommon + State SchedulerState +} + +// WdPoStProofsProcessedEvt is the journal event that gets recorded when +// Windowed PoSt proofs have been processed. +type WdPoStProofsProcessedEvt struct { + evtCommon + Partitions []miner.PoStPartition + MessageCID cid.Cid `json:",omitempty"` +} + +// WdPoStRecoveriesProcessedEvt is the journal event that gets recorded when +// Windowed PoSt recoveries have been processed. +type WdPoStRecoveriesProcessedEvt struct { + evtCommon + Declarations []miner.RecoveryDeclaration + MessageCID cid.Cid `json:",omitempty"` +} + +// WdPoStFaultsProcessedEvt is the journal event that gets recorded when +// Windowed PoSt faults have been processed. +type WdPoStFaultsProcessedEvt struct { + evtCommon + Declarations []miner.FaultDeclaration + MessageCID cid.Cid `json:",omitempty"` +} diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 83861a6d8..1ae454295 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -28,12 +28,11 @@ import ( var errNoPartitions = errors.New("no partitions") func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) { - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "failed", - Deadline: s.activeDeadline, - Error: err, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(err), + State: SchedulerStateFaulted, + } }) log.Errorf("TODO") @@ -50,11 +49,11 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli s.abort = abort s.activeDeadline = deadline - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "started", - Deadline: s.activeDeadline, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(nil), + State: SchedulerStateStarted, + } }) go func() { @@ -63,25 +62,22 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost") defer span.End() - // recordEvent records a successful proofs_processed event in the + // recordProofsEvent records a successful proofs_processed event in the // journal, even if it was a noop (no partitions). - recordEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) { - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "proofs_processed", - Deadline: s.activeDeadline, - Proofs: &WindowPoStEvt_Proofs{ - Partitions: partitions, - MessageCID: mcid, - }, - }) + recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) { + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} { + return &WdPoStProofsProcessedEvt{ + evtCommon: s.getEvtCommon(nil), + Partitions: partitions, + MessageCID: mcid, + } }) } proof, err := s.runPost(ctx, *deadline, ts) switch err { case errNoPartitions: - recordEvent(nil, cid.Undef) + recordProofsEvent(nil, cid.Undef) return case nil: sm, err := s.submitPost(ctx, proof) @@ -90,18 +86,18 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli s.failPost(err, deadline) return } - recordEvent(proof.Partitions, sm.Cid()) + recordProofsEvent(proof.Partitions, sm.Cid()) default: log.Errorf("runPost failed: %+v", err) s.failPost(err, deadline) return } - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "succeeded", - Deadline: s.activeDeadline, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(nil), + State: SchedulerStateSucceeded, + } }) }() } @@ -364,15 +360,14 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo log.Errorf("checking sector recoveries: %v", err) } - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "recoveries_processed", - Deadline: s.activeDeadline, - Recoveries: &WindowPoStEvt_Recoveries{ - Declarations: recoveries, - MessageCID: optionalCid(sigmsg), - }, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { + j := WdPoStRecoveriesProcessedEvt{ + evtCommon: s.getEvtCommon(err), + Declarations: recoveries, + MessageCID: optionalCid(sigmsg), + } + j.Error = err + return j }) if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil { @@ -380,15 +375,12 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo log.Errorf("checking sector faults: %v", err) } - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "faults_processed", - Deadline: s.activeDeadline, - Faults: &WindowPoStEvt_Faults{ - Declarations: faults, - MessageCID: optionalCid(sigmsg), - }, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} { + return WdPoStFaultsProcessedEvt{ + evtCommon: s.getEvtCommon(err), + Declarations: faults, + MessageCID: optionalCid(sigmsg), + } }) }() diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 1b2303dc0..d2b8279a0 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -19,39 +19,11 @@ import ( "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/config" - "github.com/ipfs/go-cid" "go.opencensus.io/trace" ) const StartConfidence = 4 // TODO: config -type WindowPoStEvt struct { - State string - Deadline *miner.DeadlineInfo - Height abi.ChainEpoch - TipSet []cid.Cid - Error error `json:",omitempty"` - - Proofs *WindowPoStEvt_Proofs `json:",omitempty"` - Recoveries *WindowPoStEvt_Recoveries `json:",omitempty"` - Faults *WindowPoStEvt_Faults `json:",omitempty"` -} - -type WindowPoStEvt_Proofs struct { - Partitions []miner.PoStPartition - MessageCID cid.Cid `json:",omitempty"` -} - -type WindowPoStEvt_Recoveries struct { - Declarations []miner.RecoveryDeclaration - MessageCID cid.Cid `json:",omitempty"` -} - -type WindowPoStEvt_Faults struct { - Declarations []miner.FaultDeclaration - MessageCID cid.Cid `json:",omitempty"` -} - type WindowPoStScheduler struct { api storageMinerApi feeCfg config.MinerFeeConfig @@ -69,7 +41,7 @@ type WindowPoStScheduler struct { activeDeadline *miner.DeadlineInfo abort context.CancelFunc - wdPoStEvtType journal.EventType + evtTypes [4]journal.EventType // failed abi.ChainEpoch // eps // failLk sync.Mutex @@ -94,9 +66,14 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb proofType: rt, partitionSectors: mi.WindowPoStPartitionSectors, - actor: actor, - worker: worker, - wdPoStEvtType: journal.J.RegisterEventType("storage", "wdpost"), + actor: actor, + worker: worker, + evtTypes: [...]journal.EventType{ + evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"), + evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"), + evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"), + evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"), + }, }, nil } @@ -253,11 +230,11 @@ func (s *WindowPoStScheduler) abortActivePoSt() { if s.abort != nil { s.abort() - journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { - return s.enrichWithTipset(WindowPoStEvt{ - State: "abort", - Deadline: s.activeDeadline, - }) + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { + return WdPoStSchedulerEvt{ + evtCommon: s.getEvtCommon(nil), + State: SchedulerStateAborted, + } }) log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline) @@ -267,12 +244,14 @@ func (s *WindowPoStScheduler) abortActivePoSt() { s.abort = nil } -// enrichWithTipset enriches a WindowPoStEvt with tipset information, -// if available. -func (s *WindowPoStScheduler) enrichWithTipset(evt WindowPoStEvt) WindowPoStEvt { +// getEvtCommon populates and returns common attributes from state, for a +// WdPoSt journal event. +func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon { + c := evtCommon{Error: err} if s.cur != nil { - evt.Height = s.cur.Height() - evt.TipSet = s.cur.Cids() + c.Deadline = s.activeDeadline + c.Height = s.cur.Height() + c.TipSet = s.cur.Cids() } - return evt + return c }