lotus/fsm.go

321 lines
8.3 KiB
Go
Raw Normal View History

package sealing
import (
"bytes"
"context"
2020-03-22 20:44:27 +00:00
"encoding/json"
2020-01-22 20:29:19 +00:00
"fmt"
"reflect"
2020-01-22 20:29:19 +00:00
"time"
"golang.org/x/xerrors"
statemachine "github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/prometheus/common/log"
)
2020-03-06 18:59:08 +00:00
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
2020-03-06 23:03:57 +00:00
return nil, uint64(len(events)), err
}
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorID, err)
return nil
}
return nil
2020-03-06 18:59:08 +00:00
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
}
2020-04-03 16:24:45 +00:00
var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *SectorInfo) error{
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
2020-04-03 16:54:01 +00:00
api.Packing: planOne(on(SectorPacked{}, api.PreCommit1)),
api.PreCommit1: planOne(
on(SectorPreCommit1{}, api.PreCommit2),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommit2: planOne(
on(SectorPreCommit2{}, api.PreCommitting),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommitting: planOne(
2020-04-03 16:54:01 +00:00
on(SectorSealPreCommitFailed{}, api.SealFailed),
2020-01-20 22:04:46 +00:00
on(SectorPreCommitted{}, api.WaitSeed),
2020-04-03 16:54:01 +00:00
on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
),
2020-01-20 22:04:46 +00:00
api.WaitSeed: planOne(
on(SectorSeedReady{}, api.Committing),
2020-04-03 16:54:01 +00:00
on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
),
Committing: planCommitting,
CommitWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
2020-01-20 08:23:56 +00:00
),
FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
2020-01-29 21:25:06 +00:00
),
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
),
api.SealFailed: planOne(
2020-04-03 16:54:01 +00:00
on(SectorRetrySeal{}, api.PreCommit1),
),
2020-01-23 17:34:04 +00:00
api.PreCommitFailed: planOne(
on(SectorRetryPreCommit{}, api.PreCommitting),
on(SectorRetryWaitSeed{}, api.WaitSeed),
2020-04-03 16:54:01 +00:00
on(SectorSealPreCommitFailed{}, api.SealFailed),
2020-01-23 17:34:04 +00:00
),
api.ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, api.Committing),
),
api.CommitFailed: planOne(
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorRetryWaitSeed{}, api.WaitSeed),
on(SectorRetryComputeProof{}, api.Committing),
2020-04-04 01:50:05 +00:00
on(SectorRetryInvalidProof{}, api.Committing),
),
Faulty: planOne(
on(SectorFaultReported{}, FaultReported),
),
FaultedFinal: final,
}
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
/////
// First process all events
2020-01-22 20:29:19 +00:00
for _, event := range events {
2020-03-22 20:44:27 +00:00
e, err := json.Marshal(event)
if err != nil {
log.Errorf("marshaling event for logging: %+v", err)
continue
}
2020-01-22 20:29:19 +00:00
l := Log{
Timestamp: uint64(time.Now().Unix()),
2020-03-22 20:44:27 +00:00
Message: string(e),
2020-01-22 20:29:19 +00:00
Kind: fmt.Sprintf("event;%T", event.User),
}
if err, iserr := event.User.(xerrors.Formatter); iserr {
l.Trace = fmt.Sprintf("%+v", err)
}
state.Log = append(state.Log, l)
}
p := fsmPlanners[state.State]
if p == nil {
2020-04-03 16:24:45 +00:00
return nil, xerrors.Errorf("planner for state %s not found", state.State)
}
if err := p(events, state); err != nil {
2020-04-03 16:24:45 +00:00
return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
}
/////
// Now decide what to do next
/*
* Empty
| |
| v
*<- Packing <- incoming
| |
| v
2020-04-03 16:54:01 +00:00
*<- PreCommit1 <--> SealFailed
| | ^^^
| v |||
*<- PreCommit2 -------/||
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
*<- CommitWait ---/
| |
| v
*<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/
switch state.State {
// Happy path
case Packing:
return m.handlePacking, nil
2020-04-03 16:54:01 +00:00
case api.PreCommit1:
return m.handlePreCommit1, nil
case api.PreCommit2:
return m.handlePreCommit2, nil
case api.PreCommitting:
return m.handlePreCommitting, nil
case WaitSeed:
2020-01-20 22:04:46 +00:00
return m.handleWaitSeed, nil
case Committing:
return m.handleCommitting, nil
case CommitWait:
return m.handleCommitWait, nil
case FinalizeSector:
2020-01-29 22:37:31 +00:00
return m.handleFinalizeSector, nil
case Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", state.SectorID)
// Handled failure modes
case SealFailed:
return m.handleSealFailed, nil
case PreCommitFailed:
2020-01-23 17:34:04 +00:00
return m.handlePreCommitFailed, nil
case api.ComputeProofFailed:
return m.handleComputeProofFailed, nil
case api.CommitFailed:
return m.handleCommitFailed, nil
// Faults
case Faulty:
return m.handleFaulty, nil
case FaultReported:
return m.handleFaultReported, nil
// Fatal errors
case UndefinedSectorState:
log.Error("sector update with undefined state!")
case FailedUnrecoverable:
log.Errorf("sector %d failed unrecoverably", state.SectorID)
default:
log.Errorf("unexpected sector update state: %d", state.State)
}
return nil, nil
}
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
for _, event := range events {
switch e := event.User.(type) {
case globalMutator:
if e.applyGlobal(state) {
return nil
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = CommitWait
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
continue // or it didn't!
}
log.Warnf("planCommitting: commit Seed changed")
e.apply(state)
state.State = Committing
return nil
2020-01-20 22:04:46 +00:00
case SectorComputeProofFailed:
state.State = api.ComputeProofFailed
2020-04-03 16:54:01 +00:00
case SectorSealPreCommitFailed:
state.State = api.CommitFailed
case SectorCommitFailed:
state.State = CommitFailed
default:
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
}
}
return nil
}
func (m *Sealing) restartSectors(ctx context.Context) error {
trackedSectors, err := m.ListSectors()
if err != nil {
log.Errorf("loading sector list: %+v", err)
}
for _, sector := range trackedSectors {
2020-02-23 00:47:47 +00:00
if err := m.sectors.Send(uint64(sector.SectorID), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorID, err)
}
}
// TODO: Grab on-chain sector set and diff with trackedSectors
return nil
}
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
return m.sectors.Send(id, SectorForceState{state})
}
func final(events []statemachine.Event, state *SectorInfo) error {
2020-04-03 16:24:45 +00:00
return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}
func on(mut mutator, next SectorState) func() (mutator, SectorState) {
return func() (mutator, SectorState) {
return mut, next
}
}
func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
return func(events []statemachine.Event, state *SectorInfo) error {
if len(events) != 1 {
for _, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return nil
}
}
2020-04-03 16:24:45 +00:00
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", state.State, events)
}
if gm, ok := events[0].User.(globalMutator); ok {
2020-01-22 18:30:56 +00:00
gm.applyGlobal(state)
return nil
}
for _, t := range ts {
mut, next := t()
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
continue
}
2020-01-21 16:05:10 +00:00
if err, iserr := events[0].User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorID, events[0].User, err)
}
events[0].User.(mutator).apply(state)
state.State = next
return nil
}
2020-04-03 16:24:45 +00:00
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
}
}