diff --git a/api/api_storage.go b/api/api_storage.go index 880259dd6..0df3d8658 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -2,8 +2,6 @@ package api import ( "context" - "fmt" - "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -23,29 +21,19 @@ const ( Committing Proving - SectorNoUpdate = UndefinedSectorState + FailedUnrecoverable ) -func SectorStateStr(s SectorState) string { - switch s { - case UndefinedSectorState: - return "UndefinedSectorState" - case Empty: - return "Empty" - case Packing: - return "Packing" - case Unsealed: - return "Unsealed" - case PreCommitting: - return "PreCommitting" - case PreCommitted: - return "PreCommitted" - case Committing: - return "Committing" - case Proving: - return "Proving" - } - return fmt.Sprintf("", s) +var SectorStates = []string{ + UndefinedSectorState: "UndefinedSectorState", + Empty: "Empty", + Packing: "Packing", + Unsealed: "Unsealed", + PreCommitting: "PreCommitting", + PreCommitted: "PreCommitted", + Committing: "Committing", + Proving: "Proving", + FailedUnrecoverable: "FailedUnrecoverable", } // StorageMiner is a low-level interface to the Filecoin network storage miner node diff --git a/storage/sector_states.go b/storage/sector_states.go index d57f6275d..dfee5f14e 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -12,29 +12,24 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error) +type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate -func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) { +func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) { go func() { - mut, err := cb(ctx, sector) + update := cb(ctx, sector) - if err == nil && next == api.SectorNoUpdate { - return + if update == nil { + return // async } select { - case m.sectorUpdated <- sectorUpdate{ - newState: next, - id: sector.SectorID, - err: err, - mut: mut, - }: + case m.sectorUpdated <- *update: case <-m.stop: } }() } -func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) var allocated uint64 @@ -45,12 +40,12 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) if allocated > ubytes { - return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) + return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)) } fillerSizes, err := fillersFromRem(ubytes - allocated) if err != nil { - return nil, err + return sector.upd().fatal(err) } if len(fillerSizes) > 0 { @@ -59,27 +54,27 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...) if err != nil { - return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) + return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) { info.Pieces = append(info.Pieces, pieces...) - }, nil + }) } -func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx) if err != nil { - return nil, err + return sector.upd().fatal(err) } rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { - return nil, xerrors.Errorf("seal pre commit failed: %w", err) + return sector.upd().fatal(xerrors.Errorf("seal pre commit failed: %w", err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { info.CommC = rspco.CommC[:] info.CommD = rspco.CommD[:] info.CommR = rspco.CommR[:] @@ -88,10 +83,11 @@ func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*Sec BlockHeight: ticket.BlockHeight, TicketBytes: ticket.TicketBytes[:], } - }, nil + }) + } -func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, @@ -101,7 +97,7 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI } enc, aerr := actors.SerializeParams(params) if aerr != nil { - return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -117,26 +113,26 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI log.Info("submitting precommit for sector: ", sector.SectorID) smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - return nil, xerrors.Errorf("pushing message to mpool: %w", err) + return sector.upd().fatal(xerrors.Errorf("pushing message to mpool: %w", err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) { mcid := smsg.Cid() info.PreCommitMessage = &mcid - }, nil + }) } -func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate { // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) if err != nil { - return nil, err + return sector.upd().fatal(err) } if mw.Receipt.ExitCode != 0 { log.Error("sector precommit failed: ", mw.Receipt.ExitCode) - return nil, err + return sector.upd().fatal(err) } log.Info("precommit message landed on chain: ", sector.SectorID) @@ -146,19 +142,18 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error { rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight)) if err != nil { - return xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + + m.sectorUpdated <- *sector.upd().fatal(err) + return err } - m.sectorUpdated <- sectorUpdate{ - newState: api.Committing, - id: sector.SectorID, - mut: func(info *SectorInfo) { - info.Seed = SealSeed{ - BlockHeight: randHeight, - TicketBytes: rand, - } - }, - } + m.sectorUpdated <- *sector.upd().to(api.Committing).state(func(info *SectorInfo) { + info.Seed = SealSeed{ + BlockHeight: randHeight, + TicketBytes: rand, + } + }) return nil }, func(ctx context.Context, ts *types.TipSet) error { @@ -169,15 +164,15 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect log.Warn("waitForPreCommitMessage ChainAt errored: ", err) } - return nil, nil + return nil } -func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Info("scheduling seal proof computation...") proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { - return nil, xerrors.Errorf("computing seal proof failed: %w", err) + return sector.upd().fatal(xerrors.Errorf("computing seal proof failed: %w", err)) } // TODO: Consider splitting states and persist proof for faster recovery @@ -190,7 +185,7 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector enc, aerr := actors.SerializeParams(params) if aerr != nil { - return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -212,17 +207,17 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { - return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err) + return sector.upd().fatal(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) } if mw.Receipt.ExitCode != 0 { log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, smsg.Cid(), sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, params.Proof) - return nil, xerrors.New("UNHANDLED: submitting sector proof failed") + return sector.upd().fatal(xerrors.New("UNHANDLED: submitting sector proof failed")) } - return func(info *SectorInfo) { + return sector.upd().to(api.Proving).state(func(info *SectorInfo) { mcid := smsg.Cid() info.CommitMessage = &mcid info.Proof = proof - }, nil + }) } diff --git a/storage/sector_types.go b/storage/sector_types.go index 67b394732..179a7bbc6 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -71,6 +71,10 @@ type SectorInfo struct { CommitMessage *cid.Cid } +func (t *SectorInfo) upd() *sectorUpdate { + return §orUpdate{id: t.SectorID} +} + func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) for i, piece := range t.Pieces { diff --git a/storage/sectors.go b/storage/sectors.go index 583fdc71d..7a7312520 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -18,6 +18,33 @@ type sectorUpdate struct { mut func(*SectorInfo) } +func (u *sectorUpdate) fatal(err error) *sectorUpdate { + return §orUpdate{ + newState: api.FailedUnrecoverable, + id: u.id, + err: err, + mut: u.mut, + } +} + +func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { + return §orUpdate{ + newState: u.newState, + id: u.id, + err: u.err, + mut: m, + } +} + +func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate { + return §orUpdate{ + newState: newState, + id: u.id, + err: u.err, + mut: u.mut, + } +} + func (m *Miner) sectorStateLoop(ctx context.Context) error { trackedSectors, err := m.ListSectors() if err != nil { @@ -97,9 +124,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { } if err := m.sectors.Begin(sector.SectorID, sector); err != nil { - // We may have re-sent the proposal - log.Errorf("deal tracking failed: %s", err) - m.failSector(sector.SectorID, err) + log.Errorf("sector tracking failed: %s", err) return } @@ -116,7 +141,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { } func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { - log.Infof("Sector %d updated state to %s", update.id, api.SectorStateStr(update.newState)) + log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState]) var sector SectorInfo err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { s.State = update.newState @@ -127,39 +152,67 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { return nil }) if update.err != nil { - log.Errorf("sector %d failed: %s", update.id, update.err) - m.failSector(update.id, update.err) - return + log.Errorf("sector %d failed: %+v", update.id, update.err) } if err != nil { - m.failSector(update.id, err) + log.Errorf("sector %d error: %+v", update.id, err) return } + /* + + *<- Empty + | | + | v + *<- Packing <- incoming + | | + | v + *<- Unsealed + | | + | v + *<- PreCommitting + | | + | v + *<- PreCommitted + | | + | v + *<- Committing + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ + + */ + switch update.newState { case api.Packing: - m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed) + m.handleSectorUpdate(ctx, sector, m.handlePacking) case api.Unsealed: - m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting) + m.handleSectorUpdate(ctx, sector, m.handleUnsealed) case api.PreCommitting: - m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted) + m.handleSectorUpdate(ctx, sector, m.handlePreCommitting) case api.PreCommitted: - m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate) + m.handleSectorUpdate(ctx, sector, m.handlePreCommitted) case api.Committing: - m.handleSectorUpdate(ctx, sector, m.committing, api.Proving) + m.handleSectorUpdate(ctx, sector, m.handleCommitting) case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", update.id) - case api.SectorNoUpdate: // noop + case api.UndefinedSectorState: + log.Error("sector update with undefined state!") + case api.FailedUnrecoverable: + log.Errorf("sector %d failed unrecoverably", update.id) default: log.Errorf("unexpected sector update state: %d", update.newState) } } -func (m *Miner) failSector(id uint64, err error) { - log.Errorf("sector %d error: %+v", id, err) -} - func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { if padreader.PaddedSize(size) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")