diff --git a/lib/statemachine/machine.go b/lib/statemachine/machine.go index 7302a52fe..d30c9e7d4 100644 --- a/lib/statemachine/machine.go +++ b/lib/statemachine/machine.go @@ -16,6 +16,8 @@ type Event struct { User interface{} } +// TODO: This probably should be returning an int indicating partial event processing +// (or something like errPartial(nEvents)) // returns func(ctx Context, st ) (func(*), error), where is the typeOf(User) param type Planner func(events []Event, user interface{}) (interface{}, error) diff --git a/node/builder.go b/node/builder.go index d17d6beb4..5ec6e4183 100644 --- a/node/builder.go +++ b/node/builder.go @@ -50,8 +50,8 @@ import ( "github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sealing" + "github.com/filecoin-project/lotus/storage/sectorblocks" ) // special is a type used to give keys to modules which diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index e984b883b..56c7fa6b7 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -3,6 +3,7 @@ package sealing import ( "context" "fmt" + "reflect" "golang.org/x/xerrors" @@ -28,22 +29,37 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface }, nil } +var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error { + api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)), + api.Packing: planOne(on(SectorPacked{}, api.Unsealed)), + api.Unsealed: planOne(on(SectorSealed{}, api.PreCommitting)), + api.PreCommitting: planOne(on(SectorPreCommitted{}, api.PreCommitted)), + api.PreCommitted: planOne(on(SectorSeedReady{}, api.Committing)), + api.Committing: planCommitting, + api.CommitWait: planOne(on(SectorProving{}, api.Proving)), + + api.Proving: final, +} + func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { ///// // First process all events + p := fsmPlanners[state.State] + if p == nil { + return nil, xerrors.Errorf("planner for state %d not found", state.State) + } + + if err := p(events, state); err != nil { + return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err) + } + for _, event := range events { if err, ok := event.User.(error); ok { state.LastErr = fmt.Sprintf("%+v", err) } switch event := event.User.(type) { - case SectorStart: - // TODO: check if state is clean - state.SectorID = event.id - state.Pieces = event.pieces - state.State = api.Packing - case SectorRestart: // noop case SectorFatalError: @@ -56,38 +72,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // // TODO: Incoming // TODO: for those - look at dealIDs matching chain - // // - // Packing - - case SectorPacked: - // TODO: assert state - state.Pieces = append(state.Pieces, event.pieces...) - state.State = api.Unsealed - // // Unsealed case SectorSealFailed: // TODO: try to find out the reason, maybe retry state.State = api.SealFailed - case SectorSealed: - state.CommD = event.commD - state.CommR = event.commR - state.Ticket = event.ticket - state.State = api.PreCommitting - // // PreCommit case SectorPreCommitFailed: // TODO: try to find out the reason, maybe retry state.State = api.PreCommitFailed - case SectorPreCommitted: - state.PreCommitMessage = &event.message - state.State = api.PreCommitted - - case SectorSeedReady: - state.Seed = event.seed - state.State = api.Committing // // Commit @@ -97,13 +92,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case SectorCommitFailed: // TODO: try to find out the reason, maybe retry state.State = api.SealFailed - case SectorCommitted: - state.Proof = event.proof - state.CommitMessage = &event.message - state.State = api.CommitWait - case SectorProving: - state.State = api.Proving - case SectorFaultReported: state.FaultReportMsg = &event.reportMsg state.State = api.FaultReported @@ -199,6 +187,30 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return nil, nil } +func planCommitting(events []statemachine.Event, state *SectorInfo) error { + for _, event := range events { + switch e := event.User.(type) { + case SectorRestart: + // noop + case SectorCommitted: // the normal case + e.apply(state) + state.State = api.CommitWait + case SectorSeedReady: // seed changed :/ + if e.seed.Equals(&state.Seed) { + log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") + continue // or it didn't! + } + log.Warnf("planCommitting: commit Seed changed") + e.apply(state) + state.State = api.Committing + return nil + default: + return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) + } + } + return nil +} + func (m *Sealing) restartSectors(ctx context.Context) error { trackedSectors, err := m.ListSectors() if err != nil { @@ -219,3 +231,35 @@ func (m *Sealing) restartSectors(ctx context.Context) error { func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { return m.sectors.Send(id, SectorForceState{state}) } + +func final(events []statemachine.Event, state *SectorInfo) error { + return xerrors.Errorf("didn't expect any events in state %s, got %+v", api.SectorStates[state.State], events) +} + +func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) { + return func() (mutator, api.SectorState) { + return mut, next + } +} + +func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error { + return func(events []statemachine.Event, state *SectorInfo) error { + if len(events) != 1 { + return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events) + } + + for _, t := range ts { + mut, next := t() + + if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { + continue + } + + events[0].User.(mutator).apply(state) + state.State = next + return nil + } + + return xerrors.Errorf("planner for state %s received unexpected event %+v", events[0]) + } +} diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index 39d0638d3..3e230d02d 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -6,31 +6,56 @@ import ( "github.com/filecoin-project/lotus/api" ) +type mutator interface { + apply(state *SectorInfo) +} + type SectorStart struct { id uint64 pieces []Piece } +func (evt SectorStart) apply(state *SectorInfo) { + state.SectorID = evt.id + state.Pieces = evt.pieces +} + type SectorRestart struct{} type SectorFatalError struct{ error } type SectorPacked struct{ pieces []Piece } +func (evt SectorPacked) apply(state *SectorInfo) { + state.Pieces = append(state.Pieces, evt.pieces...) +} type SectorSealed struct { commR []byte commD []byte ticket SealTicket } +func (evt SectorSealed) apply(state *SectorInfo) { + state.CommD = evt.commD + state.CommR = evt.commR + state.Ticket = evt.ticket +} + type SectorSealFailed struct{ error } type SectorPreCommitFailed struct{ error } + type SectorPreCommitted struct { message cid.Cid } +func (evt SectorPreCommitted) apply(state *SectorInfo) { + state.PreCommitMessage = &evt.message +} type SectorSeedReady struct { seed SealSeed } +func (evt SectorSeedReady) apply(state *SectorInfo) { + state.Seed = evt.seed +} type SectorSealCommitFailed struct{ error } type SectorCommitFailed struct{ error } @@ -38,8 +63,13 @@ type SectorCommitted struct { message cid.Cid proof []byte } +func (evt SectorCommitted) apply(state *SectorInfo) { + state.Proof = evt.proof + state.CommitMessage = &evt.message +} type SectorProving struct{} +func (evt SectorProving) apply(*SectorInfo) {} type SectorFaultReported struct{ reportMsg cid.Cid } type SectorFaultedFinal struct{} diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index 10b9723b9..fb31f5192 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -3,12 +3,17 @@ package sealing import ( "testing" + logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/statemachine" ) +func init() { + _ = logging.SetLogLevel("*", "INFO") +} + func (t *test) planSingle(evt interface{}) { _, err := t.s.plan([]statemachine.Event{{evt}}, t.state) require.NoError(t.t, err) @@ -67,11 +72,12 @@ func TestSeedRevert(t *testing.T) { m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, api.Committing) - _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{}}, {SectorCommitted{}}}, m.state) + _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state) require.NoError(t, err) require.Equal(m.t, m.state.State, api.Committing) - m.planSingle(SectorCommitted{}) + // not changing the seed this time + _, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state) require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) diff --git a/storage/sealing/types.go b/storage/sealing/types.go index c2d6b3359..6e248050e 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -29,6 +29,10 @@ func (t *SealSeed) SB() sectorbuilder.SealSeed { return out } +func (t *SealSeed) Equals(o *SealSeed) bool { + return string(t.TicketBytes) == string(o.TicketBytes) && t.BlockHeight == o.BlockHeight +} + type Piece struct { DealID uint64 @@ -70,6 +74,8 @@ type SectorInfo struct { // Debug LastErr string + + // TODO: Log []struct{ts, msg, trace string} } func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {