From 6dee0d8ca865e6604e98e19c51928fdc92a0f4be Mon Sep 17 00:00:00 2001 From: zenground0 Date: Mon, 7 Nov 2022 14:56:53 +0000 Subject: [PATCH] Rough draft no tests --- cmd/lotus-miner/info.go | 1 + storage/pipeline/fsm.go | 7 ++++ storage/pipeline/fsm_events.go | 8 ++++ storage/pipeline/sector_state.go | 5 ++- storage/pipeline/states_replica_update.go | 51 +++++++++++++++++++++++ 5 files changed, 71 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-miner/info.go b/cmd/lotus-miner/info.go index 312d86600..b07705188 100644 --- a/cmd/lotus-miner/info.go +++ b/cmd/lotus-miner/info.go @@ -534,6 +534,7 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.ProveReplicaUpdate}, {col: color.FgYellow, state: sealing.SubmitReplicaUpdate}, {col: color.FgYellow, state: sealing.ReplicaUpdateWait}, + {col: color.FgYellow, state: sealing.WaitMutable}, {col: color.FgYellow, state: sealing.FinalizeReplicaUpdate}, {col: color.FgYellow, state: sealing.ReleaseSectorKey}, diff --git a/storage/pipeline/fsm.go b/storage/pipeline/fsm.go index 0a75d88c8..25fd6fcef 100644 --- a/storage/pipeline/fsm.go +++ b/storage/pipeline/fsm.go @@ -188,6 +188,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto SubmitReplicaUpdate: planOne( on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait), on(SectorSubmitReplicaUpdateFailed{}, ReplicaUpdateFailed), + on(SectorDeadlineImmutable{}, WaitMutable), + ), + WaitMutable: planOne( + on(SectorDeadlineMutable{}, SubmitReplicaUpdate), + on(SectorAbortUpgrade{}, AbortUpgrade), ), ReplicaUpdateWait: planOne( on(SectorReplicaUpdateLanded{}, UpdateActivating), @@ -525,6 +530,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleProveReplicaUpdate, processed, nil case SubmitReplicaUpdate: return m.handleSubmitReplicaUpdate, processed, nil + case WaitMutable: + return m.handleWaitMutable, processed, nil case ReplicaUpdateWait: return m.handleReplicaUpdateWait, processed, nil case FinalizeReplicaUpdate: diff --git a/storage/pipeline/fsm_events.go b/storage/pipeline/fsm_events.go index f92f527ad..e75b70268 100644 --- a/storage/pipeline/fsm_events.go +++ b/storage/pipeline/fsm_events.go @@ -472,6 +472,14 @@ type SectorSubmitReplicaUpdateFailed struct{} func (evt SectorSubmitReplicaUpdateFailed) apply(state *SectorInfo) {} +type SectorDeadlineImmutable struct{} + +func (evt SectorDeadlineImmutable) apply(state *SectorInfo) {} + +type SectorDeadlineMutable struct{} + +func (evt SectorDeadlineMutable) apply(state *SectorInfo) {} + type SectorReleaseKeyFailed struct{ error } func (evt SectorReleaseKeyFailed) FormatError(xerrors.Printer) (next error) { diff --git a/storage/pipeline/sector_state.go b/storage/pipeline/sector_state.go index 7a56c136b..84c08f43b 100644 --- a/storage/pipeline/sector_state.go +++ b/storage/pipeline/sector_state.go @@ -53,6 +53,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ UpdateReplica: {}, ProveReplicaUpdate: {}, SubmitReplicaUpdate: {}, + WaitMutable: {}, ReplicaUpdateWait: {}, UpdateActivating: {}, ReleaseSectorKey: {}, @@ -110,6 +111,7 @@ const ( UpdateReplica SectorState = "UpdateReplica" ProveReplicaUpdate SectorState = "ProveReplicaUpdate" SubmitReplicaUpdate SectorState = "SubmitReplicaUpdate" + WaitMutable SectorState = "WaitMutable" ReplicaUpdateWait SectorState = "ReplicaUpdateWait" FinalizeReplicaUpdate SectorState = "FinalizeReplicaUpdate" UpdateActivating SectorState = "UpdateActivating" @@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState { return sstStaging case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector: return sstSealing - case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait: + case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, WaitMutable, SubmitReplicaUpdate, ReplicaUpdateWait: if finEarly { // we use statSectorState for throttling storage use. With FinalizeEarly // we can consider sectors in states after CommitFinalize as finalized, so @@ -184,6 +186,7 @@ func IsUpgradeState(st SectorState) bool { UpdateReplica, ProveReplicaUpdate, SubmitReplicaUpdate, + WaitMutable, SnapDealsAddPieceFailed, SnapDealsDealsExpired, diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 9170f5fb5..0639a5a87 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -97,6 +97,17 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err) return nil } + + dlinfo, err := m.Api.StateMinerProvingDeadline(ctx.Context(), m.maddr, ts.Key()) + if err != nil { + log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %w", err) + } + // if sector's deadline is immutable wait in a non error state + // sector's deadline is immutable if it is the current deadline or the next deadline + if sl.Deadline == dlinfo.Index || (dlinfo.Index+1)%dlinfo.WPoStPeriodDeadlines == sl.Deadline { + return ctx.Send(SectorDeadlineImmutable{}) + } + updateProof, err := sector.SectorType.RegisteredUpdateProof() if err != nil { log.Errorf("failed to get update proof type from seal proof: %+v", err) @@ -187,6 +198,46 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return ctx.Send(SectorReplicaUpdateSubmitted{Message: mcid}) } +func (m *Sealing) handleWaitMutable(ctx statemachine.Context, sector SectorInfo) error { + immutable := true + for immutable { + ts, err := m.Api.ChainHead(ctx.Context()) + if err != nil { + log.Errorf("handleWaitMutable: api error, not proceeding: %+v", err) + return nil + } + + sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) + if err != nil { + log.Errorf("handleWaitMutable: api error, not proceeding: %+v", err) + return nil + } + + dlinfo, err := m.Api.StateMinerProvingDeadline(ctx.Context(), m.maddr, ts.Key()) + if err != nil { + log.Errorf("handleWaitMutable: api error, not proceeding: %w", err) + return nil + } + + sectorDeadlineOpen := sl.Deadline == dlinfo.Index + sectorDeadlineNext := (dlinfo.Index+1)%dlinfo.WPoStPeriodDeadlines == sl.Deadline + immutable := sectorDeadlineOpen || sectorDeadlineNext + + // Sleep for immutable epochs + if immutable { + dlineEpochsRemaining := dlinfo.NextOpen() - ts.Height() + if sectorDeadlineOpen { + // sleep for remainder of deadline + time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second * time.Duration(dlineEpochsRemaining)) + } else { + // sleep for remainder of deadline and next one + time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second * time.Duration(dlineEpochsRemaining+dlinfo.WPoStChallengeWindow)) + } + } + } + return ctx.Send(SectorDeadlineMutable{}) +} + func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector SectorInfo) error { if sector.ReplicaUpdateMessage == nil { log.Errorf("handleReplicaUpdateWait: no replica update message cid recorded")