sealing: Handle seed changes more correctly

This commit is contained in:
Łukasz Magiera 2020-01-16 02:25:49 +01:00
parent 72ca563a1d
commit e3b05e51b1
4 changed files with 122 additions and 36 deletions

112
fsm.go
View File

@ -3,6 +3,7 @@ package sealing
import (
"context"
"fmt"
"reflect"
"golang.org/x/xerrors"
@ -28,22 +29,37 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
}, nil
}
var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error {
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)),
api.Unsealed: planOne(on(SectorSealed{}, api.PreCommitting)),
api.PreCommitting: planOne(on(SectorPreCommitted{}, api.PreCommitted)),
api.PreCommitted: planOne(on(SectorSeedReady{}, api.Committing)),
api.Committing: planCommitting,
api.CommitWait: planOne(on(SectorProving{}, api.Proving)),
api.Proving: final,
}
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
/////
// First process all events
p := fsmPlanners[state.State]
if p == nil {
return nil, xerrors.Errorf("planner for state %d not found", state.State)
}
if err := p(events, state); err != nil {
return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err)
}
for _, event := range events {
if err, ok := event.User.(error); ok {
state.LastErr = fmt.Sprintf("%+v", err)
}
switch event := event.User.(type) {
case SectorStart:
// TODO: check if state is clean
state.SectorID = event.id
state.Pieces = event.pieces
state.State = api.Packing
case SectorRestart:
// noop
case SectorFatalError:
@ -56,38 +72,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// // TODO: Incoming
// TODO: for those - look at dealIDs matching chain
// //
// Packing
case SectorPacked:
// TODO: assert state
state.Pieces = append(state.Pieces, event.pieces...)
state.State = api.Unsealed
// // Unsealed
case SectorSealFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.SealFailed
case SectorSealed:
state.CommD = event.commD
state.CommR = event.commR
state.Ticket = event.ticket
state.State = api.PreCommitting
// // PreCommit
case SectorPreCommitFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.PreCommitFailed
case SectorPreCommitted:
state.PreCommitMessage = &event.message
state.State = api.PreCommitted
case SectorSeedReady:
state.Seed = event.seed
state.State = api.Committing
// // Commit
@ -97,13 +92,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case SectorCommitFailed:
// TODO: try to find out the reason, maybe retry
state.State = api.SealFailed
case SectorCommitted:
state.Proof = event.proof
state.CommitMessage = &event.message
state.State = api.CommitWait
case SectorProving:
state.State = api.Proving
case SectorFaultReported:
state.FaultReportMsg = &event.reportMsg
state.State = api.FaultReported
@ -199,6 +187,30 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return nil, nil
}
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
for _, event := range events {
switch e := event.User.(type) {
case SectorRestart:
// noop
case SectorCommitted: // the normal case
e.apply(state)
state.State = api.CommitWait
case SectorSeedReady: // seed changed :/
if e.seed.Equals(&state.Seed) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
continue // or it didn't!
}
log.Warnf("planCommitting: commit Seed changed")
e.apply(state)
state.State = api.Committing
return nil
default:
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
}
}
return nil
}
func (m *Sealing) restartSectors(ctx context.Context) error {
trackedSectors, err := m.ListSectors()
if err != nil {
@ -219,3 +231,35 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
return m.sectors.Send(id, SectorForceState{state})
}
func final(events []statemachine.Event, state *SectorInfo) error {
return xerrors.Errorf("didn't expect any events in state %s, got %+v", api.SectorStates[state.State], events)
}
func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) {
return func() (mutator, api.SectorState) {
return mut, next
}
}
func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
return func(events []statemachine.Event, state *SectorInfo) error {
if len(events) != 1 {
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events)
}
for _, t := range ts {
mut, next := t()
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
continue
}
events[0].User.(mutator).apply(state)
state.State = next
return nil
}
return xerrors.Errorf("planner for state %s received unexpected event %+v", events[0])
}
}

View File

@ -6,31 +6,56 @@ import (
"github.com/filecoin-project/lotus/api"
)
type mutator interface {
apply(state *SectorInfo)
}
type SectorStart struct {
id uint64
pieces []Piece
}
func (evt SectorStart) apply(state *SectorInfo) {
state.SectorID = evt.id
state.Pieces = evt.pieces
}
type SectorRestart struct{}
type SectorFatalError struct{ error }
type SectorPacked struct{ pieces []Piece }
func (evt SectorPacked) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.pieces...)
}
type SectorSealed struct {
commR []byte
commD []byte
ticket SealTicket
}
func (evt SectorSealed) apply(state *SectorInfo) {
state.CommD = evt.commD
state.CommR = evt.commR
state.Ticket = evt.ticket
}
type SectorSealFailed struct{ error }
type SectorPreCommitFailed struct{ error }
type SectorPreCommitted struct {
message cid.Cid
}
func (evt SectorPreCommitted) apply(state *SectorInfo) {
state.PreCommitMessage = &evt.message
}
type SectorSeedReady struct {
seed SealSeed
}
func (evt SectorSeedReady) apply(state *SectorInfo) {
state.Seed = evt.seed
}
type SectorSealCommitFailed struct{ error }
type SectorCommitFailed struct{ error }
@ -38,8 +63,13 @@ type SectorCommitted struct {
message cid.Cid
proof []byte
}
func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.proof
state.CommitMessage = &evt.message
}
type SectorProving struct{}
func (evt SectorProving) apply(*SectorInfo) {}
type SectorFaultReported struct{ reportMsg cid.Cid }
type SectorFaultedFinal struct{}

View File

@ -3,12 +3,17 @@ package sealing
import (
"testing"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/statemachine"
)
func init() {
_ = logging.SetLogLevel("*", "INFO")
}
func (t *test) planSingle(evt interface{}) {
_, err := t.s.plan([]statemachine.Event{{evt}}, t.state)
require.NoError(t.t, err)
@ -67,11 +72,12 @@ func TestSeedRevert(t *testing.T) {
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, api.Committing)
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{}}, {SectorCommitted{}}}, m.state)
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, api.Committing)
m.planSingle(SectorCommitted{})
// not changing the seed this time
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state)
require.Equal(m.t, m.state.State, api.CommitWait)
m.planSingle(SectorProving{})

View File

@ -29,6 +29,10 @@ func (t *SealSeed) SB() sectorbuilder.SealSeed {
return out
}
func (t *SealSeed) Equals(o *SealSeed) bool {
return string(t.TicketBytes) == string(o.TicketBytes) && t.BlockHeight == o.BlockHeight
}
type Piece struct {
DealID uint64
@ -70,6 +74,8 @@ type SectorInfo struct {
// Debug
LastErr string
// TODO: Log []struct{ts, msg, trace string}
}
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {