sealing fsm: Separate precommit 1/2

This commit is contained in:
Łukasz Magiera 2020-04-03 18:54:01 +02:00
parent 147e18e28e
commit a63a0b3077
7 changed files with 80 additions and 53 deletions

View File

@ -23,7 +23,8 @@ const (
// happy path // happy path
Empty SectorState = "Empty" Empty SectorState = "Empty"
Packing SectorState = "Packing" // sector not in sealStore, and not on chain Packing SectorState = "Packing" // sector not in sealStore, and not on chain
Unsealed SectorState = "Unsealed" // sealing / queued PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1
PreCommitting SectorState = "PreCommitting" // on chain pre-commit PreCommitting SectorState = "PreCommitting" // on chain pre-commit
WaitSeed SectorState = "WaitSeed" // waiting for seed WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" Committing SectorState = "Committing"

View File

@ -34,20 +34,25 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *SectorInfo) error{ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *SectorInfo) error{
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)), api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)), api.Packing: planOne(on(SectorPacked{}, api.PreCommit1)),
api.Unsealed: planOne( api.PreCommit1: planOne(
on(SectorSealed{}, api.PreCommitting), on(SectorPreCommit1{}, api.PreCommit2),
on(SectorSealFailed{}, api.SealFailed), on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommit2: planOne(
on(SectorPreCommit2{}, api.PreCommitting),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed), on(SectorPackingFailed{}, api.PackingFailed),
), ),
api.PreCommitting: planOne( api.PreCommitting: planOne(
on(SectorSealFailed{}, api.SealFailed), on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPreCommitted{}, api.WaitSeed), on(SectorPreCommitted{}, api.WaitSeed),
on(SectorPreCommitFailed{}, api.PreCommitFailed), on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
), ),
api.WaitSeed: planOne( api.WaitSeed: planOne(
on(SectorSeedReady{}, api.Committing), on(SectorSeedReady{}, api.Committing),
on(SectorPreCommitFailed{}, api.PreCommitFailed), on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
), ),
api.Committing: planCommitting, api.Committing: planCommitting,
api.CommitWait: planOne( api.CommitWait: planOne(
@ -65,12 +70,12 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S
), ),
api.SealFailed: planOne( api.SealFailed: planOne(
on(SectorRetrySeal{}, api.Unsealed), on(SectorRetrySeal{}, api.PreCommit1),
), ),
api.PreCommitFailed: planOne( api.PreCommitFailed: planOne(
on(SectorRetryPreCommit{}, api.PreCommitting), on(SectorRetryPreCommit{}, api.PreCommitting),
on(SectorRetryWaitSeed{}, api.WaitSeed), on(SectorRetryWaitSeed{}, api.WaitSeed),
on(SectorSealFailed{}, api.SealFailed), on(SectorSealPreCommitFailed{}, api.SealFailed),
), ),
api.Faulty: planOne( api.Faulty: planOne(
@ -123,7 +128,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
*<- Packing <- incoming *<- Packing <- incoming
| | | |
| v | v
*<- Unsealed <--> SealFailed *<- PreCommit1 <--> SealFailed
| | | |
| v | v
* PreCommitting <--> PreCommitFailed * PreCommitting <--> PreCommitFailed
@ -153,8 +158,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Happy path // Happy path
case api.Packing: case api.Packing:
return m.handlePacking, nil return m.handlePacking, nil
case api.Unsealed: case api.PreCommit1:
return m.handleUnsealed, nil return m.handlePreCommit1, nil
case api.PreCommit2:
return m.handlePreCommit2, nil
case api.PreCommitting: case api.PreCommitting:
return m.handlePreCommitting, nil return m.handlePreCommitting, nil
case api.WaitSeed: case api.WaitSeed:
@ -218,7 +225,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
return nil return nil
case SectorComputeProofFailed: case SectorComputeProofFailed:
state.State = api.SealCommitFailed state.State = api.SealCommitFailed
case SectorSealFailed: case SectorSealPreCommitFailed:
state.State = api.CommitFailed state.State = api.CommitFailed
case SectorCommitFailed: case SectorCommitFailed:
state.State = api.CommitFailed state.State = api.CommitFailed

View File

@ -2,6 +2,7 @@ package sealing
import ( import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -70,29 +71,37 @@ type SectorPackingFailed struct{ error }
func (evt SectorPackingFailed) apply(*SectorInfo) {} func (evt SectorPackingFailed) apply(*SectorInfo) {}
type SectorSealed struct { type SectorPreCommit1 struct {
Sealed cid.Cid PreCommit1Out storage.PreCommit1Out
Unsealed cid.Cid Ticket api.SealTicket
Ticket api.SealTicket
} }
func (evt SectorSealed) apply(state *SectorInfo) { func (evt SectorPreCommit1) apply(state *SectorInfo) {
state.PreCommit1Out = evt.PreCommit1Out
state.Ticket = evt.Ticket
}
type SectorPreCommit2 struct {
Sealed cid.Cid
Unsealed cid.Cid
}
func (evt SectorPreCommit2) apply(state *SectorInfo) {
commd := evt.Unsealed commd := evt.Unsealed
state.CommD = &commd state.CommD = &commd
commr := evt.Sealed commr := evt.Sealed
state.CommR = &commr state.CommR = &commr
state.Ticket = evt.Ticket
} }
type SectorSealFailed struct{ error } type SectorSealPreCommitFailed struct{ error }
func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorSealPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorSealFailed) apply(*SectorInfo) {} func (evt SectorSealPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitFailed struct{ error } type SectorChainPreCommitFailed struct{ error }
func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorChainPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorPreCommitFailed) apply(*SectorInfo) {} func (evt SectorChainPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitted struct { type SectorPreCommitted struct {
Message cid.Cid Message cid.Cid

View File

@ -36,9 +36,9 @@ func TestHappyPath(t *testing.T) {
} }
m.planSingle(SectorPacked{}) m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, api.Unsealed) require.Equal(m.t, m.state.State, api.PreCommit1)
m.planSingle(SectorSealed{}) m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, api.PreCommitting) require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{}) m.planSingle(SectorPreCommitted{})
@ -65,9 +65,9 @@ func TestSeedRevert(t *testing.T) {
} }
m.planSingle(SectorPacked{}) m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, api.Unsealed) require.Equal(m.t, m.state.State, api.PreCommit1)
m.planSingle(SectorSealed{}) m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, api.PreCommitting) require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{}) m.planSingle(SectorPreCommitted{})

View File

@ -49,11 +49,11 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
return ctx.Send(SectorPacked{Pieces: pieces}) return ctx.Send(SectorPacked{Pieces: pieces})
} }
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err) log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil return nil
case *ErrInvalidDeals: case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)}) return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
@ -67,23 +67,29 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
log.Infow("performing sector replication...", "sector", sector.SectorID) log.Infow("performing sector replication...", "sector", sector.SectorID)
ticket, err := m.tktFn(ctx.Context()) ticket, err := m.tktFn(ctx.Context())
if err != nil { if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("getting ticket failed: %w", err)})
} }
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos()) pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos())
if err != nil { if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
} }
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o) return ctx.Send(SectorPreCommit1{
PreCommit1Out: pc1o,
Ticket: *ticket,
})
}
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), sector.PreCommit1Out)
if err != nil { if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
} }
return ctx.Send(SectorSealed{ return ctx.Send(SectorPreCommit2{
Unsealed: cids.Unsealed, Unsealed: cids.Unsealed,
Sealed: cids.Sealed, Sealed: cids.Sealed,
Ticket: *ticket,
}) })
} }
@ -93,10 +99,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
case *ErrApi: case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket: case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)})
default: default:
return xerrors.Errorf("checkSeal sanity check error: %w", err) return xerrors.Errorf("checkSeal sanity check error: %w", err)
} }
@ -113,7 +119,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
} }
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
} }
msg := &types.Message{ msg := &types.Message{
@ -129,7 +135,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
log.Info("submitting precommit for sector: ", sector.SectorID) log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
if err != nil { if err != nil {
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
} }
return ctx.Send(SectorPreCommitted{Message: smsg.Cid()}) return ctx.Send(SectorPreCommitted{Message: smsg.Cid()})
@ -140,13 +146,13 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
log.Info("Sector precommitted: ", sector.SectorID) log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
if err != nil { if err != nil {
return ctx.Send(SectorPreCommitFailed{err}) return ctx.Send(SectorChainPreCommitFailed{err})
} }
if mw.Receipt.ExitCode != 0 { if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode) log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
return ctx.Send(SectorPreCommitFailed{err}) return ctx.Send(SectorChainPreCommitFailed{err})
} }
log.Info("precommit message landed on chain: ", sector.SectorID) log.Info("precommit message landed on chain: ", sector.SectorID)

View File

@ -79,10 +79,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
case *ErrApi: case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket: case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired error: %w", err)}) return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)})
default: default:
return xerrors.Errorf("checkSeal sanity check error: %w", err) return xerrors.Errorf("checkSeal sanity check error: %w", err)
} }

View File

@ -2,6 +2,7 @@ package sealing
import ( import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -35,11 +36,14 @@ type SectorInfo struct {
Pieces []Piece Pieces []Piece
// PreCommit // PreCommit1
CommD *cid.Cid Ticket api.SealTicket
CommR *cid.Cid PreCommit1Out storage.PreCommit1Out
Proof []byte
Ticket api.SealTicket // PreCommit2
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
PreCommitMessage *cid.Cid PreCommitMessage *cid.Cid