sealing fsm: Separate precommit 1/2

This commit is contained in:
Łukasz Magiera 2020-04-03 18:54:01 +02:00
parent 5977db8833
commit 8c824dfaf7
6 changed files with 78 additions and 52 deletions

33
fsm.go
View File

@ -34,20 +34,25 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *SectorInfo) error{
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)),
api.Unsealed: planOne(
on(SectorSealed{}, api.PreCommitting),
on(SectorSealFailed{}, api.SealFailed),
api.Packing: planOne(on(SectorPacked{}, api.PreCommit1)),
api.PreCommit1: planOne(
on(SectorPreCommit1{}, api.PreCommit2),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommit2: planOne(
on(SectorPreCommit2{}, api.PreCommitting),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPackingFailed{}, api.PackingFailed),
),
api.PreCommitting: planOne(
on(SectorSealFailed{}, api.SealFailed),
on(SectorSealPreCommitFailed{}, api.SealFailed),
on(SectorPreCommitted{}, api.WaitSeed),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
),
api.WaitSeed: planOne(
on(SectorSeedReady{}, api.Committing),
on(SectorPreCommitFailed{}, api.PreCommitFailed),
on(SectorChainPreCommitFailed{}, api.PreCommitFailed),
),
api.Committing: planCommitting,
api.CommitWait: planOne(
@ -65,12 +70,12 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S
),
api.SealFailed: planOne(
on(SectorRetrySeal{}, api.Unsealed),
on(SectorRetrySeal{}, api.PreCommit1),
),
api.PreCommitFailed: planOne(
on(SectorRetryPreCommit{}, api.PreCommitting),
on(SectorRetryWaitSeed{}, api.WaitSeed),
on(SectorSealFailed{}, api.SealFailed),
on(SectorSealPreCommitFailed{}, api.SealFailed),
),
api.Faulty: planOne(
@ -123,7 +128,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
*<- Packing <- incoming
| |
| v
*<- Unsealed <--> SealFailed
*<- PreCommit1 <--> SealFailed
| |
| v
* PreCommitting <--> PreCommitFailed
@ -153,8 +158,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Happy path
case api.Packing:
return m.handlePacking, nil
case api.Unsealed:
return m.handleUnsealed, nil
case api.PreCommit1:
return m.handlePreCommit1, nil
case api.PreCommit2:
return m.handlePreCommit2, nil
case api.PreCommitting:
return m.handlePreCommitting, nil
case api.WaitSeed:
@ -218,7 +225,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
return nil
case SectorComputeProofFailed:
state.State = api.SealCommitFailed
case SectorSealFailed:
case SectorSealPreCommitFailed:
state.State = api.CommitFailed
case SectorCommitFailed:
state.State = api.CommitFailed

View File

@ -2,6 +2,7 @@ package sealing
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -70,29 +71,37 @@ type SectorPackingFailed struct{ error }
func (evt SectorPackingFailed) apply(*SectorInfo) {}
type SectorSealed struct {
Sealed cid.Cid
Unsealed cid.Cid
Ticket api.SealTicket
type SectorPreCommit1 struct {
PreCommit1Out storage.PreCommit1Out
Ticket api.SealTicket
}
func (evt SectorSealed) apply(state *SectorInfo) {
func (evt SectorPreCommit1) apply(state *SectorInfo) {
state.PreCommit1Out = evt.PreCommit1Out
state.Ticket = evt.Ticket
}
type SectorPreCommit2 struct {
Sealed cid.Cid
Unsealed cid.Cid
}
func (evt SectorPreCommit2) apply(state *SectorInfo) {
commd := evt.Unsealed
state.CommD = &commd
commr := evt.Sealed
state.CommR = &commr
state.Ticket = evt.Ticket
}
type SectorSealFailed struct{ error }
type SectorSealPreCommitFailed struct{ error }
func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorSealFailed) apply(*SectorInfo) {}
func (evt SectorSealPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorSealPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitFailed struct{ error }
type SectorChainPreCommitFailed struct{ error }
func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
func (evt SectorChainPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorChainPreCommitFailed) apply(*SectorInfo) {}
type SectorPreCommitted struct {
Message cid.Cid

View File

@ -36,9 +36,9 @@ func TestHappyPath(t *testing.T) {
}
m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, api.Unsealed)
require.Equal(m.t, m.state.State, api.PreCommit1)
m.planSingle(SectorSealed{})
m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{})
@ -65,9 +65,9 @@ func TestSeedRevert(t *testing.T) {
}
m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, api.Unsealed)
require.Equal(m.t, m.state.State, api.PreCommit1)
m.planSingle(SectorSealed{})
m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, api.PreCommitting)
m.planSingle(SectorPreCommitted{})

View File

@ -49,11 +49,11 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
return ctx.Send(SectorPacked{Pieces: pieces})
}
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err)
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
@ -67,23 +67,29 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
log.Infow("performing sector replication...", "sector", sector.SectorID)
ticket, err := m.tktFn(ctx.Context())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o)
return ctx.Send(SectorPreCommit1{
PreCommit1Out: pc1o,
Ticket: *ticket,
})
}
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), sector.PreCommit1Out)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
}
return ctx.Send(SectorSealed{
return ctx.Send(SectorPreCommit2{
Unsealed: cids.Unsealed,
Sealed: cids.Sealed,
Ticket: *ticket,
})
}
@ -93,10 +99,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired: %w", err)})
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)})
default:
return xerrors.Errorf("checkSeal sanity check error: %w", err)
}
@ -113,7 +119,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
}
msg := &types.Message{
@ -129,7 +135,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
if err != nil {
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
return ctx.Send(SectorPreCommitted{Message: smsg.Cid()})
@ -140,13 +146,13 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
if err != nil {
return ctx.Send(SectorPreCommitFailed{err})
return ctx.Send(SectorChainPreCommitFailed{err})
}
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
return ctx.Send(SectorPreCommitFailed{err})
return ctx.Send(SectorChainPreCommitFailed{err})
}
log.Info("precommit message landed on chain: ", sector.SectorID)

View File

@ -79,10 +79,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired error: %w", err)})
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)})
default:
return xerrors.Errorf("checkSeal sanity check error: %w", err)
}

View File

@ -2,6 +2,7 @@ package sealing
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
@ -35,11 +36,14 @@ type SectorInfo struct {
Pieces []Piece
// PreCommit
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
Ticket api.SealTicket
// PreCommit1
Ticket api.SealTicket
PreCommit1Out storage.PreCommit1Out
// PreCommit2
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
PreCommitMessage *cid.Cid