Merge pull request #9598 from filecoin-project/fix/ancient-snap-deals-terrible
fix:sealing-fsm:wait mutable fsm state for immutable sector upgrade error
This commit is contained in:
commit
c79085e5b7
@ -535,6 +535,7 @@ var stateList = []stateMeta{
|
|||||||
{col: color.FgYellow, state: sealing.ProveReplicaUpdate},
|
{col: color.FgYellow, state: sealing.ProveReplicaUpdate},
|
||||||
{col: color.FgYellow, state: sealing.SubmitReplicaUpdate},
|
{col: color.FgYellow, state: sealing.SubmitReplicaUpdate},
|
||||||
{col: color.FgYellow, state: sealing.ReplicaUpdateWait},
|
{col: color.FgYellow, state: sealing.ReplicaUpdateWait},
|
||||||
|
{col: color.FgYellow, state: sealing.WaitMutable},
|
||||||
{col: color.FgYellow, state: sealing.FinalizeReplicaUpdate},
|
{col: color.FgYellow, state: sealing.FinalizeReplicaUpdate},
|
||||||
{col: color.FgYellow, state: sealing.ReleaseSectorKey},
|
{col: color.FgYellow, state: sealing.ReleaseSectorKey},
|
||||||
|
|
||||||
|
@ -188,6 +188,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
SubmitReplicaUpdate: planOne(
|
SubmitReplicaUpdate: planOne(
|
||||||
on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait),
|
on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait),
|
||||||
on(SectorSubmitReplicaUpdateFailed{}, ReplicaUpdateFailed),
|
on(SectorSubmitReplicaUpdateFailed{}, ReplicaUpdateFailed),
|
||||||
|
on(SectorDeadlineImmutable{}, WaitMutable),
|
||||||
|
),
|
||||||
|
WaitMutable: planOne(
|
||||||
|
on(SectorDeadlineMutable{}, SubmitReplicaUpdate),
|
||||||
|
on(SectorAbortUpgrade{}, AbortUpgrade),
|
||||||
),
|
),
|
||||||
ReplicaUpdateWait: planOne(
|
ReplicaUpdateWait: planOne(
|
||||||
on(SectorReplicaUpdateLanded{}, UpdateActivating),
|
on(SectorReplicaUpdateLanded{}, UpdateActivating),
|
||||||
@ -525,6 +530,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
return m.handleProveReplicaUpdate, processed, nil
|
return m.handleProveReplicaUpdate, processed, nil
|
||||||
case SubmitReplicaUpdate:
|
case SubmitReplicaUpdate:
|
||||||
return m.handleSubmitReplicaUpdate, processed, nil
|
return m.handleSubmitReplicaUpdate, processed, nil
|
||||||
|
case WaitMutable:
|
||||||
|
return m.handleWaitMutable, processed, nil
|
||||||
case ReplicaUpdateWait:
|
case ReplicaUpdateWait:
|
||||||
return m.handleReplicaUpdateWait, processed, nil
|
return m.handleReplicaUpdateWait, processed, nil
|
||||||
case FinalizeReplicaUpdate:
|
case FinalizeReplicaUpdate:
|
||||||
|
@ -472,6 +472,14 @@ type SectorSubmitReplicaUpdateFailed struct{}
|
|||||||
|
|
||||||
func (evt SectorSubmitReplicaUpdateFailed) apply(state *SectorInfo) {}
|
func (evt SectorSubmitReplicaUpdateFailed) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorDeadlineImmutable struct{}
|
||||||
|
|
||||||
|
func (evt SectorDeadlineImmutable) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorDeadlineMutable struct{}
|
||||||
|
|
||||||
|
func (evt SectorDeadlineMutable) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
type SectorReleaseKeyFailed struct{ error }
|
type SectorReleaseKeyFailed struct{ error }
|
||||||
|
|
||||||
func (evt SectorReleaseKeyFailed) FormatError(xerrors.Printer) (next error) {
|
func (evt SectorReleaseKeyFailed) FormatError(xerrors.Printer) (next error) {
|
||||||
|
@ -53,6 +53,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
|
|||||||
UpdateReplica: {},
|
UpdateReplica: {},
|
||||||
ProveReplicaUpdate: {},
|
ProveReplicaUpdate: {},
|
||||||
SubmitReplicaUpdate: {},
|
SubmitReplicaUpdate: {},
|
||||||
|
WaitMutable: {},
|
||||||
ReplicaUpdateWait: {},
|
ReplicaUpdateWait: {},
|
||||||
UpdateActivating: {},
|
UpdateActivating: {},
|
||||||
ReleaseSectorKey: {},
|
ReleaseSectorKey: {},
|
||||||
@ -110,6 +111,7 @@ const (
|
|||||||
UpdateReplica SectorState = "UpdateReplica"
|
UpdateReplica SectorState = "UpdateReplica"
|
||||||
ProveReplicaUpdate SectorState = "ProveReplicaUpdate"
|
ProveReplicaUpdate SectorState = "ProveReplicaUpdate"
|
||||||
SubmitReplicaUpdate SectorState = "SubmitReplicaUpdate"
|
SubmitReplicaUpdate SectorState = "SubmitReplicaUpdate"
|
||||||
|
WaitMutable SectorState = "WaitMutable"
|
||||||
ReplicaUpdateWait SectorState = "ReplicaUpdateWait"
|
ReplicaUpdateWait SectorState = "ReplicaUpdateWait"
|
||||||
FinalizeReplicaUpdate SectorState = "FinalizeReplicaUpdate"
|
FinalizeReplicaUpdate SectorState = "FinalizeReplicaUpdate"
|
||||||
UpdateActivating SectorState = "UpdateActivating"
|
UpdateActivating SectorState = "UpdateActivating"
|
||||||
@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
|
|||||||
return sstStaging
|
return sstStaging
|
||||||
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector:
|
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector:
|
||||||
return sstSealing
|
return sstSealing
|
||||||
case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait:
|
case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, WaitMutable, SubmitReplicaUpdate, ReplicaUpdateWait:
|
||||||
if finEarly {
|
if finEarly {
|
||||||
// we use statSectorState for throttling storage use. With FinalizeEarly
|
// we use statSectorState for throttling storage use. With FinalizeEarly
|
||||||
// we can consider sectors in states after CommitFinalize as finalized, so
|
// we can consider sectors in states after CommitFinalize as finalized, so
|
||||||
@ -184,6 +186,7 @@ func IsUpgradeState(st SectorState) bool {
|
|||||||
UpdateReplica,
|
UpdateReplica,
|
||||||
ProveReplicaUpdate,
|
ProveReplicaUpdate,
|
||||||
SubmitReplicaUpdate,
|
SubmitReplicaUpdate,
|
||||||
|
WaitMutable,
|
||||||
|
|
||||||
SnapDealsAddPieceFailed,
|
SnapDealsAddPieceFailed,
|
||||||
SnapDealsDealsExpired,
|
SnapDealsDealsExpired,
|
||||||
|
@ -257,8 +257,9 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !active {
|
if !active {
|
||||||
log.Errorf("sector marked for upgrade %d no longer active, aborting upgrade", sector.SectorNumber)
|
err := xerrors.Errorf("sector marked for upgrade %d no longer active, aborting upgrade", sector.SectorNumber)
|
||||||
return ctx.Send(SectorAbortUpgrade{})
|
log.Errorf(err.Error())
|
||||||
|
return ctx.Send(SectorAbortUpgrade{err})
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorRetrySubmitReplicaUpdate{})
|
return ctx.Send(SectorRetrySubmitReplicaUpdate{})
|
||||||
|
@ -57,8 +57,9 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !active {
|
if !active {
|
||||||
log.Errorf("sector marked for upgrade %d no longer active, aborting upgrade", sector.SectorNumber)
|
err := xerrors.Errorf("sector marked for upgrade %d no longer active, aborting upgrade", sector.SectorNumber)
|
||||||
return ctx.Send(SectorAbortUpgrade{})
|
log.Errorf(err.Error())
|
||||||
|
return ctx.Send(SectorAbortUpgrade{err})
|
||||||
}
|
}
|
||||||
|
|
||||||
vanillaProofs, err := m.sealer.ProveReplicaUpdate1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), *sector.CommR, *sector.UpdateSealed, *sector.UpdateUnsealed)
|
vanillaProofs, err := m.sealer.ProveReplicaUpdate1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), *sector.CommR, *sector.UpdateSealed, *sector.UpdateUnsealed)
|
||||||
@ -97,6 +98,17 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
|
|||||||
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
|
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dlinfo, err := m.Api.StateMinerProvingDeadline(ctx.Context(), m.maddr, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %w", err)
|
||||||
|
}
|
||||||
|
// if sector's deadline is immutable wait in a non error state
|
||||||
|
// sector's deadline is immutable if it is the current deadline or the next deadline
|
||||||
|
if sl.Deadline == dlinfo.Index || (dlinfo.Index+1)%dlinfo.WPoStPeriodDeadlines == sl.Deadline {
|
||||||
|
return ctx.Send(SectorDeadlineImmutable{})
|
||||||
|
}
|
||||||
|
|
||||||
updateProof, err := sector.SectorType.RegisteredUpdateProof()
|
updateProof, err := sector.SectorType.RegisteredUpdateProof()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get update proof type from seal proof: %+v", err)
|
log.Errorf("failed to get update proof type from seal proof: %+v", err)
|
||||||
@ -187,6 +199,67 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
|
|||||||
return ctx.Send(SectorReplicaUpdateSubmitted{Message: mcid})
|
return ctx.Send(SectorReplicaUpdateSubmitted{Message: mcid})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleWaitMutable(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
immutable := true
|
||||||
|
for immutable {
|
||||||
|
ts, err := m.Api.ChainHead(ctx.Context())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleWaitMutable: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleWaitMutable: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dlinfo, err := m.Api.StateMinerProvingDeadline(ctx.Context(), m.maddr, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleWaitMutable: api error, not proceeding: %w", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sectorDeadlineOpen := sl.Deadline == dlinfo.Index
|
||||||
|
sectorDeadlineNext := (dlinfo.Index+1)%dlinfo.WPoStPeriodDeadlines == sl.Deadline
|
||||||
|
immutable = sectorDeadlineOpen || sectorDeadlineNext
|
||||||
|
|
||||||
|
// Sleep for immutable epochs
|
||||||
|
if immutable {
|
||||||
|
dlineEpochsRemaining := dlinfo.NextOpen() - ts.Height()
|
||||||
|
var targetEpoch abi.ChainEpoch
|
||||||
|
if sectorDeadlineOpen {
|
||||||
|
// sleep for remainder of deadline
|
||||||
|
targetEpoch = ts.Height() + dlineEpochsRemaining
|
||||||
|
} else {
|
||||||
|
// sleep for remainder of deadline and next one
|
||||||
|
targetEpoch = ts.Height() + dlineEpochsRemaining + dlinfo.WPoStChallengeWindow
|
||||||
|
}
|
||||||
|
|
||||||
|
atHeight := make(chan struct{})
|
||||||
|
err := m.events.ChainAt(ctx.Context(), func(context.Context, *types.TipSet, abi.ChainEpoch) error {
|
||||||
|
close(atHeight)
|
||||||
|
return nil
|
||||||
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
log.Warn("revert in handleWaitMutable")
|
||||||
|
return nil
|
||||||
|
}, 5, targetEpoch)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleWaitMutalbe: events error: api error, not proceeding: %w", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-atHeight:
|
||||||
|
case <-ctx.Context().Done():
|
||||||
|
return ctx.Context().Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return ctx.Send(SectorDeadlineMutable{})
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if sector.ReplicaUpdateMessage == nil {
|
if sector.ReplicaUpdateMessage == nil {
|
||||||
log.Errorf("handleReplicaUpdateWait: no replica update message cid recorded")
|
log.Errorf("handleReplicaUpdateWait: no replica update message cid recorded")
|
||||||
|
Loading…
Reference in New Issue
Block a user