sealing: Handle events based on current state

This commit is contained in:
Łukasz Magiera 2020-01-16 03:53:59 +01:00
parent ffdd436b52
commit cc424d64fe
3 changed files with 78 additions and 67 deletions

View File

@ -2,7 +2,6 @@ package sealing
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,13 +31,30 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error { var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error {
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)), api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)), api.Packing: planOne(on(SectorPacked{}, api.Unsealed)),
api.Unsealed: planOne(on(SectorSealed{}, api.PreCommitting)), api.Unsealed: planOne(
api.PreCommitting: planOne(on(SectorPreCommitted{}, api.PreCommitted)), on(SectorSealed{}, api.PreCommitting),
api.PreCommitted: planOne(on(SectorSeedReady{}, api.Committing)), on(SectorSealFailed{}, api.SealFailed),
),
api.PreCommitting: planOne(
on(SectorPreCommitted{}, api.PreCommitted),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
),
api.PreCommitted: planOne(
on(SectorSeedReady{}, api.Committing),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
),
api.Committing: planCommitting, api.Committing: planCommitting,
api.CommitWait: planOne(on(SectorProving{}, api.Proving)), api.CommitWait: planOne(on(SectorProving{}, api.Proving)),
api.Proving: final, api.Proving: planOne(
on(SectorFaultReported{}, api.FaultReported),
on(SectorFaulty{}, api.Faulty),
),
api.Faulty: planOne(
on(SectorFaultReported{}, api.FaultReported),
),
api.FaultedFinal: final,
} }
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
@ -54,56 +70,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err) return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err)
} }
for _, event := range events {
if err, ok := event.User.(error); ok {
state.LastErr = fmt.Sprintf("%+v", err)
}
switch event := event.User.(type) {
case SectorRestart:
// noop
case SectorFatalError:
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, event.error)
// TODO: Do we want to mark the state as unrecoverable?
// I feel like this should be a softer error, where the user would
// be able to send a retry event of some kind
return nil, nil
// // TODO: Incoming
// TODO: for those - look at dealIDs matching chain
// // Unsealed
case SectorSealFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.SealFailed
// // PreCommit
case SectorPreCommitFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.PreCommitFailed
// // Commit
case SectorSealCommitFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.SealCommitFailed
case SectorCommitFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.SealFailed
case SectorFaultReported:
state.FaultReportMsg = &event.reportMsg
state.State = api.FaultReported
case SectorFaultedFinal:
state.State = api.FaultedFinal
// // Debug triggers
case SectorForceState:
state.State = event.state
}
}
///// /////
// Now decide what to do next // Now decide what to do next
@ -190,8 +156,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
func planCommitting(events []statemachine.Event, state *SectorInfo) error { func planCommitting(events []statemachine.Event, state *SectorInfo) error {
for _, event := range events { for _, event := range events {
switch e := event.User.(type) { switch e := event.User.(type) {
case SectorRestart: case globalMutator:
// noop if e.applyGlobal(state) {
return nil
}
case SectorCommitted: // the normal case case SectorCommitted: // the normal case
e.apply(state) e.apply(state)
state.State = api.CommitWait state.State = api.CommitWait
@ -204,6 +172,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
e.apply(state) e.apply(state)
state.State = api.Committing state.State = api.Committing
return nil return nil
case SectorSealCommitFailed:
state.State = api.SealCommitFailed
case SectorSealFailed:
state.State = api.CommitFailed
default: default:
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
} }
@ -245,6 +217,12 @@ func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) {
func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error { func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
return func(events []statemachine.Event, state *SectorInfo) error { return func(events []statemachine.Event, state *SectorInfo) error {
if len(events) != 1 { if len(events) != 1 {
for _, event := range events {
if gm, ok := event.User.(globalMutator); !ok {
gm.applyGlobal(state)
return nil
}
}
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events) return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events)
} }

View File

@ -1,15 +1,45 @@
package sealing package sealing
import ( import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/ipfs/go-cid"
) )
type mutator interface { type mutator interface {
apply(state *SectorInfo) apply(state *SectorInfo)
} }
// globalMutator is an event which can apply in every state
type globalMutator interface {
// applyGlobal applies the event to the state. If if returns true,
// event processing should be interrupted
applyGlobal(state *SectorInfo) bool
}
// Global events
type SectorRestart struct{}
func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false }
type SectorFatalError struct{ error }
func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error)
// TODO: Do we want to mark the state as unrecoverable?
// I feel like this should be a softer error, where the user would
// be able to send a retry event of some kind
return true
}
type SectorForceState struct {
state api.SectorState
}
func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
state.State = evt.state
return true
}
// Normal path
type SectorStart struct { type SectorStart struct {
id uint64 id uint64
pieces []Piece pieces []Piece
@ -19,10 +49,6 @@ func (evt SectorStart) apply(state *SectorInfo) {
state.Pieces = evt.pieces state.Pieces = evt.pieces
} }
type SectorRestart struct{}
type SectorFatalError struct{ error }
type SectorPacked struct{ pieces []Piece } type SectorPacked struct{ pieces []Piece }
func (evt SectorPacked) apply(state *SectorInfo) { func (evt SectorPacked) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.pieces...) state.Pieces = append(state.Pieces, evt.pieces...)
@ -40,8 +66,10 @@ func (evt SectorSealed) apply(state *SectorInfo) {
} }
type SectorSealFailed struct{ error } type SectorSealFailed struct{ error }
func (evt SectorSealFailed) apply(*SectorInfo) {}
type SectorPreCommitFailed struct{ error } type SectorPreCommitFailed struct{ error }
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitted struct { type SectorPreCommitted struct {
message cid.Cid message cid.Cid
@ -71,9 +99,12 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
type SectorProving struct{} type SectorProving struct{}
func (evt SectorProving) apply(*SectorInfo) {} func (evt SectorProving) apply(*SectorInfo) {}
type SectorFaultReported struct{ reportMsg cid.Cid } type SectorFaulty struct{}
type SectorFaultedFinal struct{} func (evt SectorFaulty) apply(state *SectorInfo) {}
type SectorForceState struct { type SectorFaultReported struct{ reportMsg cid.Cid }
state api.SectorState func (evt SectorFaultReported) apply(state *SectorInfo) {
state.FaultReportMsg = &evt.reportMsg
} }
type SectorFaultedFinal struct{}

View File

@ -173,6 +173,8 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
GasPrice: types.NewInt(1), GasPrice: types.NewInt(1),
} }
// TODO: check seed / ticket are up to date
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})