storagefsm: Allow removing sectors in all states
This commit is contained in:
parent
deb013cecb
commit
15191ff80f
96
extern/storage-sealing/fsm.go
vendored
96
extern/storage-sealing/fsm.go
vendored
@ -17,9 +17,9 @@ import (
|
||||
)
|
||||
|
||||
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
|
||||
next, err := m.plan(events, user.(*SectorInfo))
|
||||
next, processed, err := m.plan(events, user.(*SectorInfo))
|
||||
if err != nil || next == nil {
|
||||
return nil, uint64(len(events)), err
|
||||
return nil, processed, err
|
||||
}
|
||||
|
||||
return func(ctx statemachine.Context, si SectorInfo) error {
|
||||
@ -30,10 +30,10 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
|
||||
}
|
||||
|
||||
return nil
|
||||
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
|
||||
}, processed, nil // TODO: This processed event count is not very correct
|
||||
}
|
||||
|
||||
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
|
||||
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){
|
||||
// Sealing
|
||||
|
||||
UndefinedSectorState: planOne(
|
||||
@ -119,7 +119,6 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
Proving: planOne(
|
||||
on(SectorFaultReported{}, FaultReported),
|
||||
on(SectorFaulty{}, Faulty),
|
||||
on(SectorRemove{}, Removing),
|
||||
),
|
||||
Removing: planOne(
|
||||
on(SectorRemoved{}, Removed),
|
||||
@ -133,7 +132,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
Removed: final,
|
||||
}
|
||||
|
||||
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) {
|
||||
/////
|
||||
// First process all events
|
||||
|
||||
@ -170,11 +169,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
|
||||
p := fsmPlanners[state.State]
|
||||
if p == nil {
|
||||
return nil, xerrors.Errorf("planner for state %s not found", state.State)
|
||||
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)
|
||||
}
|
||||
|
||||
if err := p(events, state); err != nil {
|
||||
return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
|
||||
processed, err := p(events, state)
|
||||
if err != nil {
|
||||
return nil, 0, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
|
||||
}
|
||||
|
||||
/////
|
||||
@ -235,51 +235,51 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
case WaitDeals:
|
||||
log.Infof("Waiting for deals %d", state.SectorNumber)
|
||||
case Packing:
|
||||
return m.handlePacking, nil
|
||||
return m.handlePacking, processed, nil
|
||||
case PreCommit1:
|
||||
return m.handlePreCommit1, nil
|
||||
return m.handlePreCommit1, processed, nil
|
||||
case PreCommit2:
|
||||
return m.handlePreCommit2, nil
|
||||
return m.handlePreCommit2, processed, nil
|
||||
case PreCommitting:
|
||||
return m.handlePreCommitting, nil
|
||||
return m.handlePreCommitting, processed, nil
|
||||
case PreCommitWait:
|
||||
return m.handlePreCommitWait, nil
|
||||
return m.handlePreCommitWait, processed, nil
|
||||
case WaitSeed:
|
||||
return m.handleWaitSeed, nil
|
||||
return m.handleWaitSeed, processed, nil
|
||||
case Committing:
|
||||
return m.handleCommitting, nil
|
||||
return m.handleCommitting, processed, nil
|
||||
case CommitWait:
|
||||
return m.handleCommitWait, nil
|
||||
return m.handleCommitWait, processed, nil
|
||||
case FinalizeSector:
|
||||
return m.handleFinalizeSector, nil
|
||||
return m.handleFinalizeSector, processed, nil
|
||||
|
||||
// Handled failure modes
|
||||
case SealPreCommit1Failed:
|
||||
return m.handleSealPrecommit1Failed, nil
|
||||
return m.handleSealPrecommit1Failed, processed, nil
|
||||
case SealPreCommit2Failed:
|
||||
return m.handleSealPrecommit2Failed, nil
|
||||
return m.handleSealPrecommit2Failed, processed, nil
|
||||
case PreCommitFailed:
|
||||
return m.handlePreCommitFailed, nil
|
||||
return m.handlePreCommitFailed, processed, nil
|
||||
case ComputeProofFailed:
|
||||
return m.handleComputeProofFailed, nil
|
||||
return m.handleComputeProofFailed, processed, nil
|
||||
case CommitFailed:
|
||||
return m.handleCommitFailed, nil
|
||||
return m.handleCommitFailed, processed, nil
|
||||
case FinalizeFailed:
|
||||
return m.handleFinalizeFailed, nil
|
||||
return m.handleFinalizeFailed, processed, nil
|
||||
|
||||
// Post-seal
|
||||
case Proving:
|
||||
return m.handleProvingSector, nil
|
||||
return m.handleProvingSector, processed, nil
|
||||
case Removing:
|
||||
return m.handleRemoving, nil
|
||||
return m.handleRemoving, processed, nil
|
||||
case Removed:
|
||||
return nil, nil
|
||||
return nil, processed, nil
|
||||
|
||||
// Faults
|
||||
case Faulty:
|
||||
return m.handleFaulty, nil
|
||||
return m.handleFaulty, processed, nil
|
||||
case FaultReported:
|
||||
return m.handleFaultReported, nil
|
||||
return m.handleFaultReported, processed, nil
|
||||
|
||||
// Fatal errors
|
||||
case UndefinedSectorState:
|
||||
@ -290,15 +290,15 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
log.Errorf("unexpected sector update state: %s", state.State)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return nil, processed, nil
|
||||
}
|
||||
|
||||
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||
for _, event := range events {
|
||||
switch e := event.User.(type) {
|
||||
case globalMutator:
|
||||
if e.applyGlobal(state) {
|
||||
return nil
|
||||
return 1, nil
|
||||
}
|
||||
case SectorCommitted: // the normal case
|
||||
e.apply(state)
|
||||
@ -311,7 +311,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||
log.Warnf("planCommitting: commit Seed changed")
|
||||
e.apply(state)
|
||||
state.State = Committing
|
||||
return nil
|
||||
return 1, nil
|
||||
case SectorComputeProofFailed:
|
||||
state.State = ComputeProofFailed
|
||||
case SectorSealPreCommit1Failed:
|
||||
@ -321,10 +321,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||
case SectorRetryCommitWait:
|
||||
state.State = CommitWait
|
||||
default:
|
||||
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||
return 0, xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||
@ -365,8 +365,8 @@ func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, sta
|
||||
return m.sectors.Send(id, SectorForceState{state})
|
||||
}
|
||||
|
||||
func final(events []statemachine.Event, state *SectorInfo) error {
|
||||
return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
|
||||
func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
|
||||
}
|
||||
|
||||
func on(mut mutator, next SectorState) func() (mutator, SectorState) {
|
||||
@ -375,21 +375,11 @@ func on(mut mutator, next SectorState) func() (mutator, SectorState) {
|
||||
}
|
||||
}
|
||||
|
||||
func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
|
||||
return func(events []statemachine.Event, state *SectorInfo) error {
|
||||
if len(events) != 1 {
|
||||
for _, event := range events {
|
||||
if gm, ok := event.User.(globalMutator); ok {
|
||||
gm.applyGlobal(state)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", state.State, events)
|
||||
}
|
||||
|
||||
func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||
if gm, ok := events[0].User.(globalMutator); ok {
|
||||
gm.applyGlobal(state)
|
||||
return nil
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
for _, t := range ts {
|
||||
@ -405,14 +395,14 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
|
||||
|
||||
events[0].User.(mutator).apply(state)
|
||||
state.State = next
|
||||
return nil
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
_, ok := events[0].User.(Ignorable)
|
||||
if ok {
|
||||
return nil
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
|
||||
return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
|
||||
}
|
||||
}
|
||||
|
5
extern/storage-sealing/fsm_events.go
vendored
5
extern/storage-sealing/fsm_events.go
vendored
@ -274,7 +274,10 @@ type SectorFaultedFinal struct{}
|
||||
|
||||
type SectorRemove struct{}
|
||||
|
||||
func (evt SectorRemove) apply(state *SectorInfo) {}
|
||||
func (evt SectorRemove) applyGlobal(state *SectorInfo) bool {
|
||||
state.State = Removing
|
||||
return true
|
||||
}
|
||||
|
||||
type SectorRemoved struct{}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user