From 8c824dfaf7af2bc39b75bbb86ff659ecdc297cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Apr 2020 18:54:01 +0200 Subject: [PATCH] sealing fsm: Separate precommit 1/2 --- fsm.go | 33 ++++++++++++++++++++------------- fsm_events.go | 33 +++++++++++++++++++++------------ fsm_test.go | 8 ++++---- states.go | 36 +++++++++++++++++++++--------------- states_failed.go | 6 +++--- types.go | 14 +++++++++----- 6 files changed, 78 insertions(+), 52 deletions(-) diff --git a/fsm.go b/fsm.go index a2951d807..93aa4c1e9 100644 --- a/fsm.go +++ b/fsm.go @@ -34,20 +34,25 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *SectorInfo) error{ api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)), - api.Packing: planOne(on(SectorPacked{}, api.Unsealed)), - api.Unsealed: planOne( - on(SectorSealed{}, api.PreCommitting), - on(SectorSealFailed{}, api.SealFailed), + api.Packing: planOne(on(SectorPacked{}, api.PreCommit1)), + api.PreCommit1: planOne( + on(SectorPreCommit1{}, api.PreCommit2), + on(SectorSealPreCommitFailed{}, api.SealFailed), + on(SectorPackingFailed{}, api.PackingFailed), + ), + api.PreCommit2: planOne( + on(SectorPreCommit2{}, api.PreCommitting), + on(SectorSealPreCommitFailed{}, api.SealFailed), on(SectorPackingFailed{}, api.PackingFailed), ), api.PreCommitting: planOne( - on(SectorSealFailed{}, api.SealFailed), + on(SectorSealPreCommitFailed{}, api.SealFailed), on(SectorPreCommitted{}, api.WaitSeed), - on(SectorPreCommitFailed{}, api.PreCommitFailed), + on(SectorChainPreCommitFailed{}, api.PreCommitFailed), ), api.WaitSeed: planOne( on(SectorSeedReady{}, api.Committing), - on(SectorPreCommitFailed{}, api.PreCommitFailed), + on(SectorChainPreCommitFailed{}, api.PreCommitFailed), ), api.Committing: planCommitting, api.CommitWait: planOne( @@ -65,12 +70,12 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S ), api.SealFailed: planOne( - on(SectorRetrySeal{}, api.Unsealed), + on(SectorRetrySeal{}, api.PreCommit1), ), api.PreCommitFailed: planOne( on(SectorRetryPreCommit{}, api.PreCommitting), on(SectorRetryWaitSeed{}, api.WaitSeed), - on(SectorSealFailed{}, api.SealFailed), + on(SectorSealPreCommitFailed{}, api.SealFailed), ), api.Faulty: planOne( @@ -123,7 +128,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta *<- Packing <- incoming | | | v - *<- Unsealed <--> SealFailed + *<- PreCommit1 <--> SealFailed | | | v * PreCommitting <--> PreCommitFailed @@ -153,8 +158,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // Happy path case api.Packing: return m.handlePacking, nil - case api.Unsealed: - return m.handleUnsealed, nil + case api.PreCommit1: + return m.handlePreCommit1, nil + case api.PreCommit2: + return m.handlePreCommit2, nil case api.PreCommitting: return m.handlePreCommitting, nil case api.WaitSeed: @@ -218,7 +225,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { return nil case SectorComputeProofFailed: state.State = api.SealCommitFailed - case SectorSealFailed: + case SectorSealPreCommitFailed: state.State = api.CommitFailed case SectorCommitFailed: state.State = api.CommitFailed diff --git a/fsm_events.go b/fsm_events.go index d433765ee..e7c9a69b7 100644 --- a/fsm_events.go +++ b/fsm_events.go @@ -2,6 +2,7 @@ package sealing import ( "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -70,29 +71,37 @@ type SectorPackingFailed struct{ error } func (evt SectorPackingFailed) apply(*SectorInfo) {} -type SectorSealed struct { - Sealed cid.Cid - Unsealed cid.Cid - Ticket api.SealTicket +type SectorPreCommit1 struct { + PreCommit1Out storage.PreCommit1Out + Ticket api.SealTicket } -func (evt SectorSealed) apply(state *SectorInfo) { +func (evt SectorPreCommit1) apply(state *SectorInfo) { + state.PreCommit1Out = evt.PreCommit1Out + state.Ticket = evt.Ticket +} + +type SectorPreCommit2 struct { + Sealed cid.Cid + Unsealed cid.Cid +} + +func (evt SectorPreCommit2) apply(state *SectorInfo) { commd := evt.Unsealed state.CommD = &commd commr := evt.Sealed state.CommR = &commr - state.Ticket = evt.Ticket } -type SectorSealFailed struct{ error } +type SectorSealPreCommitFailed struct{ error } -func (evt SectorSealFailed) FormatError(xerrors.Printer) (next error) { return evt.error } -func (evt SectorSealFailed) apply(*SectorInfo) {} +func (evt SectorSealPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorSealPreCommitFailed) apply(*SectorInfo) {} -type SectorPreCommitFailed struct{ error } +type SectorChainPreCommitFailed struct{ error } -func (evt SectorPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } -func (evt SectorPreCommitFailed) apply(*SectorInfo) {} +func (evt SectorChainPreCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorChainPreCommitFailed) apply(*SectorInfo) {} type SectorPreCommitted struct { Message cid.Cid diff --git a/fsm_test.go b/fsm_test.go index 490b01771..57e478e6f 100644 --- a/fsm_test.go +++ b/fsm_test.go @@ -36,9 +36,9 @@ func TestHappyPath(t *testing.T) { } m.planSingle(SectorPacked{}) - require.Equal(m.t, m.state.State, api.Unsealed) + require.Equal(m.t, m.state.State, api.PreCommit1) - m.planSingle(SectorSealed{}) + m.planSingle(SectorPreCommit2{}) require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) @@ -65,9 +65,9 @@ func TestSeedRevert(t *testing.T) { } m.planSingle(SectorPacked{}) - require.Equal(m.t, m.state.State, api.Unsealed) + require.Equal(m.t, m.state.State, api.PreCommit1) - m.planSingle(SectorSealed{}) + m.planSingle(SectorPreCommit2{}) require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) diff --git a/states.go b/states.go index 3e6037b6d..69f36d314 100644 --- a/states.go +++ b/states.go @@ -49,11 +49,11 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err return ctx.Send(SectorPacked{Pieces: pieces}) } -func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: - log.Errorf("handleUnsealed: api error, not proceeding: %+v", err) + log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil case *ErrInvalidDeals: return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)}) @@ -67,23 +67,29 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx.Context()) if err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos()) if err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } - cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o) + return ctx.Send(SectorPreCommit1{ + PreCommit1Out: pc1o, + Ticket: *ticket, + }) +} + +func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { + cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), sector.PreCommit1Out) if err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } - return ctx.Send(SectorSealed{ + return ctx.Send(SectorPreCommit2{ Unsealed: cids.Unsealed, Sealed: cids.Sealed, - Ticket: *ticket, }) } @@ -93,10 +99,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf case *ErrApi: log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) return nil - case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) - return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)}) case *ErrExpiredTicket: - return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired: %w", err)}) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)}) default: return xerrors.Errorf("checkSeal sanity check error: %w", err) } @@ -113,7 +119,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf } enc, aerr := actors.SerializeParams(params) if aerr != nil { - return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) } msg := &types.Message{ @@ -129,7 +135,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf log.Info("submitting precommit for sector: ", sector.SectorID) smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) if err != nil { - return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } return ctx.Send(SectorPreCommitted{Message: smsg.Cid()}) @@ -140,13 +146,13 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) if err != nil { - return ctx.Send(SectorPreCommitFailed{err}) + return ctx.Send(SectorChainPreCommitFailed{err}) } if mw.Receipt.ExitCode != 0 { log.Error("sector precommit failed: ", mw.Receipt.ExitCode) err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) - return ctx.Send(SectorPreCommitFailed{err}) + return ctx.Send(SectorChainPreCommitFailed{err}) } log.Info("precommit message landed on chain: ", sector.SectorID) diff --git a/states_failed.go b/states_failed.go index da5227683..8c0cc46f8 100644 --- a/states_failed.go +++ b/states_failed.go @@ -79,10 +79,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI case *ErrApi: log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) return nil - case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) - return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)}) case *ErrExpiredTicket: - return ctx.Send(SectorSealFailed{xerrors.Errorf("ticket expired error: %w", err)}) + return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)}) default: return xerrors.Errorf("checkSeal sanity check error: %w", err) } diff --git a/types.go b/types.go index 4e9e0dccc..197bf6e15 100644 --- a/types.go +++ b/types.go @@ -2,6 +2,7 @@ package sealing import ( "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/api" @@ -35,11 +36,14 @@ type SectorInfo struct { Pieces []Piece - // PreCommit - CommD *cid.Cid - CommR *cid.Cid - Proof []byte - Ticket api.SealTicket + // PreCommit1 + Ticket api.SealTicket + PreCommit1Out storage.PreCommit1Out + + // PreCommit2 + CommD *cid.Cid + CommR *cid.Cid + Proof []byte PreCommitMessage *cid.Cid