storagefsm: Add stub AddPieceFailed state

This commit is contained in:
Łukasz Magiera 2021-01-20 18:42:22 +01:00
parent f96f12c836
commit b9a9f23204
5 changed files with 19 additions and 3 deletions

View File

@ -307,6 +307,7 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: sealing.Removed}, {col: color.FgCyan, state: sealing.Removed},
{col: color.FgRed, state: sealing.FailedUnrecoverable}, {col: color.FgRed, state: sealing.FailedUnrecoverable},
{col: color.FgRed, state: sealing.AddPieceFailed},
{col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit1Failed},
{col: color.FgRed, state: sealing.SealPreCommit2Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed}, {col: color.FgRed, state: sealing.PreCommitFailed},

View File

@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
AddPiece: planOne( AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals), on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}), apply(SectorStartPacking{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
), ),
Packing: planOne(on(SectorPacked{}, GetTicket)), Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne( GetTicket: planOne(
@ -104,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
// Sealing errors // Sealing errors
AddPieceFailed: planOne(),
SealPreCommit1Failed: planOne( SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1), on(SectorRetrySealPreCommit1{}, PreCommit1),
), ),

View File

@ -95,6 +95,11 @@ func (evt SectorPieceAdded) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPieces...) state.Pieces = append(state.Pieces, evt.NewPieces...)
} }
type SectorAddPieceFailed struct{ error }
func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorAddPieceFailed) apply(si *SectorInfo) {}
type SectorStartPacking struct{} type SectorStartPacking struct{}
func (evt SectorStartPacking) apply(*SectorInfo) {} func (evt SectorStartPacking) apply(*SectorInfo) {}

View File

@ -117,7 +117,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
} }
m.inputLk.Unlock() m.inputLk.Unlock()
if !ok { if !ok {
// nothing to do here // nothing to do here (might happen after a restart in AddPiece)
return ctx.Send(res) return ctx.Send(res)
} }
@ -166,7 +166,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
if err != nil { if err != nil {
err = xerrors.Errorf("writing padding piece: %w", err) err = xerrors.Errorf("writing padding piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err) deal.accepted(sector.SectorNumber, offset, err)
return err // todo failed state return ctx.Send(SectorAddPieceFailed{err})
} }
pieceSizes = append(pieceSizes, p.Unpadded()) pieceSizes = append(pieceSizes, p.Unpadded())
@ -183,7 +183,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
if err != nil { if err != nil {
err = xerrors.Errorf("writing piece: %w", err) err = xerrors.Errorf("writing piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err) deal.accepted(sector.SectorNumber, offset, err)
return err // todo failed state return ctx.Send(SectorAddPieceFailed{err})
} }
deal.accepted(sector.SectorNumber, offset, nil) deal.accepted(sector.SectorNumber, offset, nil)
@ -200,6 +200,12 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(res) return ctx.Send(res)
} }
func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error {
log.Errorf("No recovery plan for AddPiece failing")
// todo: cleanup sector / just go retry (requires adding offset param to AddPiece in sector-storage for this to be safe)
return nil
}
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid)
if (padreader.PaddedSize(uint64(size))) != size { if (padreader.PaddedSize(uint64(size))) != size {

View File

@ -7,6 +7,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
WaitDeals: {}, WaitDeals: {},
Packing: {}, Packing: {},
AddPiece: {}, AddPiece: {},
AddPieceFailed: {},
GetTicket: {}, GetTicket: {},
PreCommit1: {}, PreCommit1: {},
PreCommit2: {}, PreCommit2: {},
@ -61,6 +62,7 @@ const (
Proving SectorState = "Proving" Proving SectorState = "Proving"
// error modes // error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable" FailedUnrecoverable SectorState = "FailedUnrecoverable"
AddPieceFailed SectorState = "AddPieceFailed"
SealPreCommit1Failed SectorState = "SealPreCommit1Failed" SealPreCommit1Failed SectorState = "SealPreCommit1Failed"
SealPreCommit2Failed SectorState = "SealPreCommit2Failed" SealPreCommit2Failed SectorState = "SealPreCommit2Failed"
PreCommitFailed SectorState = "PreCommitFailed" PreCommitFailed SectorState = "PreCommitFailed"