storagefsm: Attempt to auto-recover from reorged DealIDs

This commit is contained in:
Łukasz Magiera 2020-08-27 21:04:43 +02:00
parent 7806a9885a
commit 489d5239a5
7 changed files with 198 additions and 20 deletions

View File

@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{182}); err != nil {
if _, err := w.Write([]byte{183}); err != nil {
return err
}
@ -905,6 +905,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.Return (sealing.ReturnState) (string)
if len("Return") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Return\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Return"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Return")); err != nil {
return err
}
if len(t.Return) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Return was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Return))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Return)); err != nil {
return err
}
// t.LastErr (string) (string)
if len("LastErr") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"LastErr\" was too long")
@ -1407,6 +1430,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
}
}
// t.Return (sealing.ReturnState) (string)
case "Return":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.Return = ReturnState(sval)
}
// t.LastErr (string) (string)
case "LastErr":

View File

@ -50,6 +50,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPackingFailed{}, PackingFailed),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
PreCommit2: planOne(
on(SectorPreCommit2{}, PreCommitting),
@ -62,6 +63,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
PreCommitWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed),
@ -103,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
@ -118,6 +121,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
@ -125,6 +129,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
DealsExpired: planOne(
// SectorRemove (global)
),
RecoverDealIDs: planOne(
onReturning(SectorUpdateDealIDs{}),
),
// Post-seal
@ -389,13 +396,30 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}
func on(mut mutator, next SectorState) func() (mutator, SectorState) {
return func() (mutator, SectorState) {
return mut, next
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
state.State = next
return nil
}
}
}
func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
if state.Return == "" {
return xerrors.Errorf("return state not set")
}
state.State = SectorState(state.Return)
state.Return = ""
return nil
}
}
}
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
@ -414,8 +438,7 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
}
events[0].User.(mutator).apply(state)
state.State = next
return 1, nil
return 1, next(state)
}
_, ok := events[0].User.(Ignorable)

View File

@ -271,6 +271,24 @@ type SectorRetryCommitWait struct{}
func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}
type SectorInvalidDealIDs struct{
Return ReturnState
}
func (evt SectorInvalidDealIDs) apply(state *SectorInfo) {
state.Return = evt.Return
}
type SectorUpdateDealIDs struct{
Updates map[int]abi.DealID
}
func (evt SectorUpdateDealIDs) apply(state *SectorInfo) {
for i, id := range evt.Updates {
state.Pieces[i].DealInfo.DealID = id
}
}
// Faults
type SectorFaulty struct{}

View File

@ -29,6 +29,7 @@ const (
PackingFailed SectorState = "PackingFailed"
FinalizeFailed SectorState = "FinalizeFailed"
DealsExpired SectorState = "DealsExpired"
RecoverDealIDs SectorState = "RecoverDealIDs"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain

View File

@ -1,12 +1,18 @@
package sealing
import (
"bytes"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)
const minRetryTime = 1 * time.Minute
@ -82,9 +88,8 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrNoPrecommit:
@ -166,9 +171,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
@ -206,9 +210,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrCommitWaitFailed:
@ -261,3 +264,91 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo
// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorRemove{})
}
func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
var toFix []int
for i, p := range sector.Pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
if p.DealInfo == nil {
exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded())
if !p.Piece.PieceCID.Equals(exp) {
return xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sector.SectorNumber, i, p.Piece.PieceCID)
}
continue
}
proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok)
if err != nil {
log.Warn("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err)
toFix = append(toFix, i)
continue
}
if proposal.Provider != m.maddr {
log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr)
toFix = append(toFix, i)
continue
}
if proposal.PieceCID != p.Piece.PieceCID {
log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)
toFix = append(toFix, i)
continue
}
if p.Piece.Size != proposal.PieceSize {
log.Warn("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize)
toFix = append(toFix, i)
continue
}
if height >= proposal.StartEpoch {
// TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces
// (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval)
return xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)
}
}
updates := map[int]abi.DealID{}
for _, i := range toFix {
p := sector.Pieces[i]
if p.DealInfo.PublishCid == nil {
// TODO: check if we are in an early enough state try to remove this piece
log.Error("can't fix sector deals: piece %d (of %d) of sector %d has nil DealInfo.PublishCid (refers to deal %d)", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID)
// Not much to do here (and this can only happen for old spacerace sectors)
return ctx.Send(SectorRemove{})
}
ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid)
if err != nil {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err)
}
if ml.Receipt.ExitCode != exitcode.Ok {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode)
}
var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil {
return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}
if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}
updates[i] = retval.IDs[0]
}
// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorUpdateDealIDs{Updates: updates})
}

View File

@ -86,7 +86,8 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)})
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommit1 })
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)})
default:
@ -157,9 +158,8 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitting })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrPrecommitOnChain:

View File

@ -54,6 +54,14 @@ type Log struct {
Kind string
}
type ReturnState string
const (
RetPreCommit1 = ReturnState(PreCommit1)
RetPreCommitting = ReturnState(PreCommitting)
RetPreCommitFailed = ReturnState(PreCommitFailed)
RetCommitFailed = ReturnState(CommitFailed)
)
type SectorInfo struct {
State SectorState
SectorNumber abi.SectorNumber
@ -91,6 +99,9 @@ type SectorInfo struct {
// Faults
FaultReportMsg *cid.Cid
// Recovery
Return ReturnState
// Debug
LastErr string