split wdpost event into finer-grained ones.

This commit is contained in:
Raúl Kripalani 2020-09-02 19:45:25 +01:00
parent ac152abc75
commit f046af337a
3 changed files with 135 additions and 90 deletions

74
storage/wdpost_journal.go Normal file
View File

@ -0,0 +1,74 @@
package storage
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
)
// SchedulerState defines the possible states in which the scheduler could be,
// for the purposes of journalling.
type SchedulerState string
const (
// SchedulerStateStarted gets recorded when a WdPoSt cycle for an
// epoch begins.
SchedulerStateStarted = SchedulerState("started")
// SchedulerStateAborted gets recorded when a WdPoSt cycle for an
// epoch is aborted, normally because of a chain reorg or advancement.
SchedulerStateAborted = SchedulerState("aborted")
// SchedulerStateFaulted gets recorded when a WdPoSt cycle for an
// epoch terminates abnormally, in which case the error is also recorded.
SchedulerStateFaulted = SchedulerState("faulted")
// SchedulerStateSucceeded gets recorded when a WdPoSt cycle for an
// epoch ends successfully.
SchedulerStateSucceeded = SchedulerState("succeeded")
)
// Journal event types.
const (
evtTypeWdPoStScheduler = iota
evtTypeWdPoStProofs
evtTypeWdPoStRecoveries
evtTypeWdPoStFaults
)
// evtCommon is a common set of attributes for Windowed PoSt journal events.
type evtCommon struct {
Deadline *miner.DeadlineInfo
Height abi.ChainEpoch
TipSet []cid.Cid
Error error `json:",omitempty"`
}
// WdPoStSchedulerEvt is the journal event that gets recorded on scheduler
// actions.
type WdPoStSchedulerEvt struct {
evtCommon
State SchedulerState
}
// WdPoStProofsProcessedEvt is the journal event that gets recorded when
// Windowed PoSt proofs have been processed.
type WdPoStProofsProcessedEvt struct {
evtCommon
Partitions []miner.PoStPartition
MessageCID cid.Cid `json:",omitempty"`
}
// WdPoStRecoveriesProcessedEvt is the journal event that gets recorded when
// Windowed PoSt recoveries have been processed.
type WdPoStRecoveriesProcessedEvt struct {
evtCommon
Declarations []miner.RecoveryDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
// WdPoStFaultsProcessedEvt is the journal event that gets recorded when
// Windowed PoSt faults have been processed.
type WdPoStFaultsProcessedEvt struct {
evtCommon
Declarations []miner.FaultDeclaration
MessageCID cid.Cid `json:",omitempty"`
}

View File

@ -28,12 +28,11 @@ import (
var errNoPartitions = errors.New("no partitions") var errNoPartitions = errors.New("no partitions")
func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) { func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) {
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return WdPoStSchedulerEvt{
State: "failed", evtCommon: s.getEvtCommon(err),
Deadline: s.activeDeadline, State: SchedulerStateFaulted,
Error: err, }
})
}) })
log.Errorf("TODO") log.Errorf("TODO")
@ -50,11 +49,11 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
s.abort = abort s.abort = abort
s.activeDeadline = deadline s.activeDeadline = deadline
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return WdPoStSchedulerEvt{
State: "started", evtCommon: s.getEvtCommon(nil),
Deadline: s.activeDeadline, State: SchedulerStateStarted,
}) }
}) })
go func() { go func() {
@ -63,25 +62,22 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost") ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost")
defer span.End() defer span.End()
// recordEvent records a successful proofs_processed event in the // recordProofsEvent records a successful proofs_processed event in the
// journal, even if it was a noop (no partitions). // journal, even if it was a noop (no partitions).
recordEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) { recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) {
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return &WdPoStProofsProcessedEvt{
State: "proofs_processed", evtCommon: s.getEvtCommon(nil),
Deadline: s.activeDeadline,
Proofs: &WindowPoStEvt_Proofs{
Partitions: partitions, Partitions: partitions,
MessageCID: mcid, MessageCID: mcid,
}, }
})
}) })
} }
proof, err := s.runPost(ctx, *deadline, ts) proof, err := s.runPost(ctx, *deadline, ts)
switch err { switch err {
case errNoPartitions: case errNoPartitions:
recordEvent(nil, cid.Undef) recordProofsEvent(nil, cid.Undef)
return return
case nil: case nil:
sm, err := s.submitPost(ctx, proof) sm, err := s.submitPost(ctx, proof)
@ -90,18 +86,18 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
s.failPost(err, deadline) s.failPost(err, deadline)
return return
} }
recordEvent(proof.Partitions, sm.Cid()) recordProofsEvent(proof.Partitions, sm.Cid())
default: default:
log.Errorf("runPost failed: %+v", err) log.Errorf("runPost failed: %+v", err)
s.failPost(err, deadline) s.failPost(err, deadline)
return return
} }
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return WdPoStSchedulerEvt{
State: "succeeded", evtCommon: s.getEvtCommon(nil),
Deadline: s.activeDeadline, State: SchedulerStateSucceeded,
}) }
}) })
}() }()
} }
@ -364,15 +360,14 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
log.Errorf("checking sector recoveries: %v", err) log.Errorf("checking sector recoveries: %v", err)
} }
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ j := WdPoStRecoveriesProcessedEvt{
State: "recoveries_processed", evtCommon: s.getEvtCommon(err),
Deadline: s.activeDeadline,
Recoveries: &WindowPoStEvt_Recoveries{
Declarations: recoveries, Declarations: recoveries,
MessageCID: optionalCid(sigmsg), MessageCID: optionalCid(sigmsg),
}, }
}) j.Error = err
return j
}) })
if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil { if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
@ -380,15 +375,12 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
log.Errorf("checking sector faults: %v", err) log.Errorf("checking sector faults: %v", err)
} }
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return WdPoStFaultsProcessedEvt{
State: "faults_processed", evtCommon: s.getEvtCommon(err),
Deadline: s.activeDeadline,
Faults: &WindowPoStEvt_Faults{
Declarations: faults, Declarations: faults,
MessageCID: optionalCid(sigmsg), MessageCID: optionalCid(sigmsg),
}, }
})
}) })
}() }()

View File

@ -19,39 +19,11 @@ import (
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/ipfs/go-cid"
"go.opencensus.io/trace" "go.opencensus.io/trace"
) )
const StartConfidence = 4 // TODO: config const StartConfidence = 4 // TODO: config
type WindowPoStEvt struct {
State string
Deadline *miner.DeadlineInfo
Height abi.ChainEpoch
TipSet []cid.Cid
Error error `json:",omitempty"`
Proofs *WindowPoStEvt_Proofs `json:",omitempty"`
Recoveries *WindowPoStEvt_Recoveries `json:",omitempty"`
Faults *WindowPoStEvt_Faults `json:",omitempty"`
}
type WindowPoStEvt_Proofs struct {
Partitions []miner.PoStPartition
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStEvt_Recoveries struct {
Declarations []miner.RecoveryDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStEvt_Faults struct {
Declarations []miner.FaultDeclaration
MessageCID cid.Cid `json:",omitempty"`
}
type WindowPoStScheduler struct { type WindowPoStScheduler struct {
api storageMinerApi api storageMinerApi
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
@ -69,7 +41,7 @@ type WindowPoStScheduler struct {
activeDeadline *miner.DeadlineInfo activeDeadline *miner.DeadlineInfo
abort context.CancelFunc abort context.CancelFunc
wdPoStEvtType journal.EventType evtTypes [4]journal.EventType
// failed abi.ChainEpoch // eps // failed abi.ChainEpoch // eps
// failLk sync.Mutex // failLk sync.Mutex
@ -96,7 +68,12 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
actor: actor, actor: actor,
worker: worker, worker: worker,
wdPoStEvtType: journal.J.RegisterEventType("storage", "wdpost"), evtTypes: [...]journal.EventType{
evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"),
evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"),
evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"),
},
}, nil }, nil
} }
@ -253,11 +230,11 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
if s.abort != nil { if s.abort != nil {
s.abort() s.abort()
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{ return WdPoStSchedulerEvt{
State: "abort", evtCommon: s.getEvtCommon(nil),
Deadline: s.activeDeadline, State: SchedulerStateAborted,
}) }
}) })
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline) log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
@ -267,12 +244,14 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
s.abort = nil s.abort = nil
} }
// enrichWithTipset enriches a WindowPoStEvt with tipset information, // getEvtCommon populates and returns common attributes from state, for a
// if available. // WdPoSt journal event.
func (s *WindowPoStScheduler) enrichWithTipset(evt WindowPoStEvt) WindowPoStEvt { func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon {
c := evtCommon{Error: err}
if s.cur != nil { if s.cur != nil {
evt.Height = s.cur.Height() c.Deadline = s.activeDeadline
evt.TipSet = s.cur.Cids() c.Height = s.cur.Height()
c.TipSet = s.cur.Cids()
} }
return evt return c
} }