sealing: Handle seed changes more correctly
This commit is contained in:
parent
e9ca6c0871
commit
ffdd436b52
@ -16,6 +16,8 @@ type Event struct {
|
|||||||
User interface{}
|
User interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: This probably should be returning an int indicating partial event processing
|
||||||
|
// (or something like errPartial(nEvents))
|
||||||
// returns func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
|
// returns func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
|
||||||
type Planner func(events []Event, user interface{}) (interface{}, error)
|
type Planner func(events []Event, user interface{}) (interface{}, error)
|
||||||
|
|
||||||
|
@ -50,8 +50,8 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
|
||||||
"github.com/filecoin-project/lotus/storage/sealing"
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
// special is a type used to give keys to modules which
|
// special is a type used to give keys to modules which
|
||||||
|
@ -3,6 +3,7 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -28,22 +29,37 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
|
||||||
|
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)),
|
||||||
|
api.Unsealed: planOne(on(SectorSealed{}, api.PreCommitting)),
|
||||||
|
api.PreCommitting: planOne(on(SectorPreCommitted{}, api.PreCommitted)),
|
||||||
|
api.PreCommitted: planOne(on(SectorSeedReady{}, api.Committing)),
|
||||||
|
api.Committing: planCommitting,
|
||||||
|
api.CommitWait: planOne(on(SectorProving{}, api.Proving)),
|
||||||
|
|
||||||
|
api.Proving: final,
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||||
/////
|
/////
|
||||||
// First process all events
|
// First process all events
|
||||||
|
|
||||||
|
p := fsmPlanners[state.State]
|
||||||
|
if p == nil {
|
||||||
|
return nil, xerrors.Errorf("planner for state %d not found", state.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p(events, state); err != nil {
|
||||||
|
return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if err, ok := event.User.(error); ok {
|
if err, ok := event.User.(error); ok {
|
||||||
state.LastErr = fmt.Sprintf("%+v", err)
|
state.LastErr = fmt.Sprintf("%+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch event := event.User.(type) {
|
switch event := event.User.(type) {
|
||||||
case SectorStart:
|
|
||||||
// TODO: check if state is clean
|
|
||||||
state.SectorID = event.id
|
|
||||||
state.Pieces = event.pieces
|
|
||||||
state.State = api.Packing
|
|
||||||
|
|
||||||
case SectorRestart:
|
case SectorRestart:
|
||||||
// noop
|
// noop
|
||||||
case SectorFatalError:
|
case SectorFatalError:
|
||||||
@ -56,38 +72,17 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
// // TODO: Incoming
|
// // TODO: Incoming
|
||||||
// TODO: for those - look at dealIDs matching chain
|
// TODO: for those - look at dealIDs matching chain
|
||||||
|
|
||||||
// //
|
|
||||||
// Packing
|
|
||||||
|
|
||||||
case SectorPacked:
|
|
||||||
// TODO: assert state
|
|
||||||
state.Pieces = append(state.Pieces, event.pieces...)
|
|
||||||
state.State = api.Unsealed
|
|
||||||
|
|
||||||
// // Unsealed
|
// // Unsealed
|
||||||
|
|
||||||
case SectorSealFailed:
|
case SectorSealFailed:
|
||||||
// TODO: try to find out the reason, maybe retry
|
// TODO: try to find out the reason, maybe retry
|
||||||
state.State = api.SealFailed
|
state.State = api.SealFailed
|
||||||
|
|
||||||
case SectorSealed:
|
|
||||||
state.CommD = event.commD
|
|
||||||
state.CommR = event.commR
|
|
||||||
state.Ticket = event.ticket
|
|
||||||
state.State = api.PreCommitting
|
|
||||||
|
|
||||||
// // PreCommit
|
// // PreCommit
|
||||||
|
|
||||||
case SectorPreCommitFailed:
|
case SectorPreCommitFailed:
|
||||||
// TODO: try to find out the reason, maybe retry
|
// TODO: try to find out the reason, maybe retry
|
||||||
state.State = api.PreCommitFailed
|
state.State = api.PreCommitFailed
|
||||||
case SectorPreCommitted:
|
|
||||||
state.PreCommitMessage = &event.message
|
|
||||||
state.State = api.PreCommitted
|
|
||||||
|
|
||||||
case SectorSeedReady:
|
|
||||||
state.Seed = event.seed
|
|
||||||
state.State = api.Committing
|
|
||||||
|
|
||||||
// // Commit
|
// // Commit
|
||||||
|
|
||||||
@ -97,13 +92,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
case SectorCommitFailed:
|
case SectorCommitFailed:
|
||||||
// TODO: try to find out the reason, maybe retry
|
// TODO: try to find out the reason, maybe retry
|
||||||
state.State = api.SealFailed
|
state.State = api.SealFailed
|
||||||
case SectorCommitted:
|
|
||||||
state.Proof = event.proof
|
|
||||||
state.CommitMessage = &event.message
|
|
||||||
state.State = api.CommitWait
|
|
||||||
case SectorProving:
|
|
||||||
state.State = api.Proving
|
|
||||||
|
|
||||||
case SectorFaultReported:
|
case SectorFaultReported:
|
||||||
state.FaultReportMsg = &event.reportMsg
|
state.FaultReportMsg = &event.reportMsg
|
||||||
state.State = api.FaultReported
|
state.State = api.FaultReported
|
||||||
@ -199,6 +187,30 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
for _, event := range events {
|
||||||
|
switch e := event.User.(type) {
|
||||||
|
case SectorRestart:
|
||||||
|
// noop
|
||||||
|
case SectorCommitted: // the normal case
|
||||||
|
e.apply(state)
|
||||||
|
state.State = api.CommitWait
|
||||||
|
case SectorSeedReady: // seed changed :/
|
||||||
|
if e.seed.Equals(&state.Seed) {
|
||||||
|
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
||||||
|
continue // or it didn't!
|
||||||
|
}
|
||||||
|
log.Warnf("planCommitting: commit Seed changed")
|
||||||
|
e.apply(state)
|
||||||
|
state.State = api.Committing
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) restartSectors(ctx context.Context) error {
|
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||||
trackedSectors, err := m.ListSectors()
|
trackedSectors, err := m.ListSectors()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -219,3 +231,35 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
|||||||
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
return m.sectors.Send(id, SectorForceState{state})
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func final(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
return xerrors.Errorf("didn't expect any events in state %s, got %+v", api.SectorStates[state.State], events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) {
|
||||||
|
return func() (mutator, api.SectorState) {
|
||||||
|
return mut, next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
return func(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
if len(events) != 1 {
|
||||||
|
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, t := range ts {
|
||||||
|
mut, next := t()
|
||||||
|
|
||||||
|
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
events[0].User.(mutator).apply(state)
|
||||||
|
state.State = next
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return xerrors.Errorf("planner for state %s received unexpected event %+v", events[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -6,31 +6,56 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type mutator interface {
|
||||||
|
apply(state *SectorInfo)
|
||||||
|
}
|
||||||
|
|
||||||
type SectorStart struct {
|
type SectorStart struct {
|
||||||
id uint64
|
id uint64
|
||||||
pieces []Piece
|
pieces []Piece
|
||||||
}
|
}
|
||||||
|
func (evt SectorStart) apply(state *SectorInfo) {
|
||||||
|
state.SectorID = evt.id
|
||||||
|
state.Pieces = evt.pieces
|
||||||
|
}
|
||||||
|
|
||||||
type SectorRestart struct{}
|
type SectorRestart struct{}
|
||||||
|
|
||||||
type SectorFatalError struct{ error }
|
type SectorFatalError struct{ error }
|
||||||
|
|
||||||
type SectorPacked struct{ pieces []Piece }
|
type SectorPacked struct{ pieces []Piece }
|
||||||
|
func (evt SectorPacked) apply(state *SectorInfo) {
|
||||||
|
state.Pieces = append(state.Pieces, evt.pieces...)
|
||||||
|
}
|
||||||
|
|
||||||
type SectorSealed struct {
|
type SectorSealed struct {
|
||||||
commR []byte
|
commR []byte
|
||||||
commD []byte
|
commD []byte
|
||||||
ticket SealTicket
|
ticket SealTicket
|
||||||
}
|
}
|
||||||
|
func (evt SectorSealed) apply(state *SectorInfo) {
|
||||||
|
state.CommD = evt.commD
|
||||||
|
state.CommR = evt.commR
|
||||||
|
state.Ticket = evt.ticket
|
||||||
|
}
|
||||||
|
|
||||||
type SectorSealFailed struct{ error }
|
type SectorSealFailed struct{ error }
|
||||||
|
|
||||||
type SectorPreCommitFailed struct{ error }
|
type SectorPreCommitFailed struct{ error }
|
||||||
|
|
||||||
type SectorPreCommitted struct {
|
type SectorPreCommitted struct {
|
||||||
message cid.Cid
|
message cid.Cid
|
||||||
}
|
}
|
||||||
|
func (evt SectorPreCommitted) apply(state *SectorInfo) {
|
||||||
|
state.PreCommitMessage = &evt.message
|
||||||
|
}
|
||||||
|
|
||||||
type SectorSeedReady struct {
|
type SectorSeedReady struct {
|
||||||
seed SealSeed
|
seed SealSeed
|
||||||
}
|
}
|
||||||
|
func (evt SectorSeedReady) apply(state *SectorInfo) {
|
||||||
|
state.Seed = evt.seed
|
||||||
|
}
|
||||||
|
|
||||||
type SectorSealCommitFailed struct{ error }
|
type SectorSealCommitFailed struct{ error }
|
||||||
type SectorCommitFailed struct{ error }
|
type SectorCommitFailed struct{ error }
|
||||||
@ -38,8 +63,13 @@ type SectorCommitted struct {
|
|||||||
message cid.Cid
|
message cid.Cid
|
||||||
proof []byte
|
proof []byte
|
||||||
}
|
}
|
||||||
|
func (evt SectorCommitted) apply(state *SectorInfo) {
|
||||||
|
state.Proof = evt.proof
|
||||||
|
state.CommitMessage = &evt.message
|
||||||
|
}
|
||||||
|
|
||||||
type SectorProving struct{}
|
type SectorProving struct{}
|
||||||
|
func (evt SectorProving) apply(*SectorInfo) {}
|
||||||
|
|
||||||
type SectorFaultReported struct{ reportMsg cid.Cid }
|
type SectorFaultReported struct{ reportMsg cid.Cid }
|
||||||
type SectorFaultedFinal struct{}
|
type SectorFaultedFinal struct{}
|
||||||
|
@ -3,12 +3,17 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/lib/statemachine"
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
_ = logging.SetLogLevel("*", "INFO")
|
||||||
|
}
|
||||||
|
|
||||||
func (t *test) planSingle(evt interface{}) {
|
func (t *test) planSingle(evt interface{}) {
|
||||||
_, err := t.s.plan([]statemachine.Event{{evt}}, t.state)
|
_, err := t.s.plan([]statemachine.Event{{evt}}, t.state)
|
||||||
require.NoError(t.t, err)
|
require.NoError(t.t, err)
|
||||||
@ -67,11 +72,12 @@ func TestSeedRevert(t *testing.T) {
|
|||||||
m.planSingle(SectorSeedReady{})
|
m.planSingle(SectorSeedReady{})
|
||||||
require.Equal(m.t, m.state.State, api.Committing)
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{}}, {SectorCommitted{}}}, m.state)
|
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(m.t, m.state.State, api.Committing)
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
m.planSingle(SectorCommitted{})
|
// not changing the seed this time
|
||||||
|
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed:SealSeed{BlockHeight: 5,}}}, {SectorCommitted{}}}, m.state)
|
||||||
require.Equal(m.t, m.state.State, api.CommitWait)
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
m.planSingle(SectorProving{})
|
m.planSingle(SectorProving{})
|
||||||
|
@ -29,6 +29,10 @@ func (t *SealSeed) SB() sectorbuilder.SealSeed {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *SealSeed) Equals(o *SealSeed) bool {
|
||||||
|
return string(t.TicketBytes) == string(o.TicketBytes) && t.BlockHeight == o.BlockHeight
|
||||||
|
}
|
||||||
|
|
||||||
type Piece struct {
|
type Piece struct {
|
||||||
DealID uint64
|
DealID uint64
|
||||||
|
|
||||||
@ -70,6 +74,8 @@ type SectorInfo struct {
|
|||||||
|
|
||||||
// Debug
|
// Debug
|
||||||
LastErr string
|
LastErr string
|
||||||
|
|
||||||
|
// TODO: Log []struct{ts, msg, trace string}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
||||||
|
Loading…
Reference in New Issue
Block a user