shuffle code to journal wdpost events.

This commit is contained in:
Raúl Kripalani 2020-09-02 19:15:25 +01:00
parent 0b6a182a44
commit ac152abc75

View File

@ -151,41 +151,24 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check abi.BitFie
return sbf, nil
}
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) error {
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries")
defer span.End()
var sm *types.SignedMessage
faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{},
}
defer journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
}
return s.enrichWithTipset(WindowPoStEvt{
State: "recoveries_processed",
Deadline: s.activeDeadline,
Recoveries: &WindowPoStEvt_Recoveries{
Declarations: params.Recoveries,
MessageCID: mcid,
},
})
})
for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.Faults, partition.Recoveries)
if err != nil {
return xerrors.Errorf("subtracting recovered set from fault set: %w", err)
return nil, nil, xerrors.Errorf("subtracting recovered set from fault set: %w", err)
}
uc, err := unrecovered.Count()
if err != nil {
return xerrors.Errorf("counting unrecovered sectors: %w", err)
return nil, nil, xerrors.Errorf("counting unrecovered sectors: %w", err)
}
if uc == 0 {
@ -196,13 +179,13 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
recovered, err := s.checkSectors(ctx, unrecovered)
if err != nil {
return xerrors.Errorf("checking unrecovered sectors: %w", err)
return nil, nil, xerrors.Errorf("checking unrecovered sectors: %w", err)
}
// if all sectors failed to recover, don't declare recoveries
recoveredCount, err := recovered.Count()
if err != nil {
return xerrors.Errorf("counting recovered sectors: %w", err)
return nil, nil, xerrors.Errorf("counting recovered sectors: %w", err)
}
if recoveredCount == 0 {
@ -216,17 +199,18 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
})
}
if len(params.Recoveries) == 0 {
recoveries := params.Recoveries
if len(recoveries) == 0 {
if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
}
return nil
return recoveries, nil, nil
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
msg := &types.Message{
@ -241,67 +225,51 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
if err != nil {
return xerrors.Errorf("declare faults recovered wait error: %w", err)
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return nil
return recoveries, sm, nil
}
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) error {
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []*miner.Partition) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults")
defer span.End()
var sm *types.SignedMessage
bad := uint64(0)
params := &miner.DeclareFaultsParams{
Faults: []miner.FaultDeclaration{},
}
defer journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
}
return s.enrichWithTipset(WindowPoStEvt{
State: "faults_processed",
Deadline: s.activeDeadline,
Faults: &WindowPoStEvt_Faults{
Declarations: params.Faults,
MessageCID: mcid,
},
})
})
for partIdx, partition := range partitions {
toCheck, err := partition.ActiveSectors()
if err != nil {
return xerrors.Errorf("getting active sectors: %w", err)
return nil, nil, xerrors.Errorf("getting active sectors: %w", err)
}
good, err := s.checkSectors(ctx, toCheck)
if err != nil {
return xerrors.Errorf("checking sectors: %w", err)
return nil, nil, xerrors.Errorf("checking sectors: %w", err)
}
faulty, err := bitfield.SubtractBitField(toCheck, good)
if err != nil {
return xerrors.Errorf("calculating faulty sector set: %w", err)
return nil, nil, xerrors.Errorf("calculating faulty sector set: %w", err)
}
c, err := faulty.Count()
if err != nil {
return xerrors.Errorf("counting faulty sectors: %w", err)
return nil, nil, xerrors.Errorf("counting faulty sectors: %w", err)
}
if c == 0 {
@ -317,15 +285,16 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
})
}
if len(params.Faults) == 0 {
return nil
faults := params.Faults
if len(faults) == 0 {
return faults, nil, nil
}
log.Errorw("DETECTED FAULTY SECTORS, declaring faults", "count", bad)
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
return faults, nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
}
msg := &types.Message{
@ -340,21 +309,21 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
return faults, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
if err != nil {
return xerrors.Errorf("declare faults wait error: %w", err)
return faults, sm, xerrors.Errorf("declare faults wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
return faults, sm, xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return nil
return faults, sm, nil
}
func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) {
@ -374,15 +343,53 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
return
}
if err := s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
var (
sigmsg *types.SignedMessage
recoveries []miner.RecoveryDeclaration
faults []miner.FaultDeclaration
// optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the
// pointer), but it's clearer and purer like that.
optionalCid = func(sigmsg *types.SignedMessage) cid.Cid {
if sigmsg == nil {
return cid.Undef
}
return sigmsg.Cid()
}
)
if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}
if err := s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{
State: "recoveries_processed",
Deadline: s.activeDeadline,
Recoveries: &WindowPoStEvt_Recoveries{
Declarations: recoveries,
MessageCID: optionalCid(sigmsg),
},
})
})
if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}
journal.J.RecordEvent(s.wdPoStEvtType, func() interface{} {
return s.enrichWithTipset(WindowPoStEvt{
State: "faults_processed",
Deadline: s.activeDeadline,
Faults: &WindowPoStEvt_Faults{
Declarations: faults,
MessageCID: optionalCid(sigmsg),
},
})
})
}()
buf := new(bytes.Buffer)