diff --git a/lib/evtsm/evtsm_test.go b/lib/evtsm/evtsm_test.go index bcf4aab69..906b34c72 100644 --- a/lib/evtsm/evtsm_test.go +++ b/lib/evtsm/evtsm_test.go @@ -24,7 +24,7 @@ func (t *testHandler) Plan(events []Event, state interface{}) (interface{}, erro return t.plan(events, state.(*TestState)) } -func (t *testHandler) plan(events []Event, state *TestState) (interface{}, error) { +func (t *testHandler) plan(events []Event, state *TestState) (func(Context, TestState) error, error) { for _, event := range events { e := event.User.(*TestEvent) switch e.A { diff --git a/lib/evtsm/sched.go b/lib/evtsm/sched.go index 39dd3027e..cdf375ced 100644 --- a/lib/evtsm/sched.go +++ b/lib/evtsm/sched.go @@ -97,3 +97,11 @@ func (s *Sched) Stop(ctx context.Context) error { return nil } + +func (s *Sched) List(out interface{}) error { + return s.sts.List(out) +} + +func (s *Sched) Get(i interface{}) *statestore.StoredState { + return s.sts.Get(i) +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dd6e6825e..ea6caf023 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -205,7 +205,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed } func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error { - return sm.Miner.UpdateSectorState(ctx, id, storage.NonceIncrement, state) + return sm.Miner.ForceSectorState(ctx, id, state) } func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { diff --git a/storage/miner.go b/storage/miner.go index 2a0b267f5..18d011534 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -3,11 +3,13 @@ package storage import ( "context" "errors" + "github.com/filecoin-project/lotus/lib/evtsm" + "github.com/ipfs/go-datastore/namespace" + "reflect" "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "golang.org/x/xerrors" @@ -20,7 +22,6 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/lib/statestore" ) var log = logging.Logger("storageminer") @@ -37,11 +38,10 @@ type Miner struct { // Sealing sb *sectorbuilder.SectorBuilder - sectors *statestore.StateStore + sectors *evtsm.Sched tktFn TicketFn sectorIncoming chan *SectorInfo - sectorUpdated chan sectorUpdate stop chan struct{} stopped chan struct{} } @@ -72,7 +72,7 @@ type storageMinerApi interface { } func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, tktFn TicketFn) (*Miner, error) { - return &Miner{ + m := &Miner{ api: api, maddr: addr, @@ -80,13 +80,15 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto sb: sb, tktFn: tktFn, - sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix))), - sectorIncoming: make(chan *SectorInfo), - sectorUpdated: make(chan sectorUpdate), stop: make(chan struct{}), stopped: make(chan struct{}), - }, nil + } + + // TODO: separate sector stuff from miner struct + m.sectors = evtsm.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), m, reflect.TypeOf(SectorInfo{})) + + return m, nil } func (m *Miner) Run(ctx context.Context) error { @@ -104,15 +106,17 @@ func (m *Miner) Run(ctx context.Context) error { } go fps.run(ctx) - if err := m.sectorStateLoop(ctx); err != nil { + if err := m.restartSectors(ctx); err != nil { log.Errorf("%+v", err) - return xerrors.Errorf("failed to startup sector state loop: %w", err) + return xerrors.Errorf("failed load sector states: %w", err) } return nil } func (m *Miner) Stop(ctx context.Context) error { + defer m.sectors.Stop(ctx) + close(m.stop) select { case <-m.stopped: diff --git a/storage/sector_2.go b/storage/sector_2.go new file mode 100644 index 000000000..25f0fad9c --- /dev/null +++ b/storage/sector_2.go @@ -0,0 +1,263 @@ +package storage + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/evtsm" +) + +type SectorStart struct { + id uint64 + pieces []Piece +} +type SectorRestart struct{} + +type SectorFatalError struct{ error } + +type SectorPacked struct{ pieces []Piece } + +type SectorSealed struct { + commR []byte + commD []byte + ticket SealTicket +} +type SectorSealFailed struct{ error } + +type SectorPreCommitFailed struct{ error } +type SectorPreCommitted struct { + message cid.Cid +} + +type SectorSeedReady struct { + seed SealSeed +} + +type SectorSealCommitFailed struct{ error } +type SectorCommitFailed struct{ error } +type SectorCommitted struct { + message cid.Cid + proof []byte +} + +type SectorProving struct{} + +type SectorFaultReported struct{ reportMsg cid.Cid } +type SectorFaultedFinal struct{} + +type SectorForceState struct { + state api.SectorState +} + +func (m *Miner) Plan(events []evtsm.Event, user interface{}) (interface{}, error) { + next, err := m.plan(events, user.(*SectorInfo)) + if err != nil || next == nil { + return nil, err + } + + return func(ctx evtsm.Context, si SectorInfo) error { + err := next(ctx, si) + if err != nil { + if err := ctx.Send(SectorFatalError{error: err}); err != nil { + return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err) + } + } + + return nil + }, nil +} + +func (m *Miner) plan(events []evtsm.Event, state *SectorInfo) (func(evtsm.Context, SectorInfo) error, error) { + ///// + // First process all events + + for _, event := range events { + if err, ok := event.User.(error); ok { + state.LastErr = fmt.Sprintf("%+v", err) + } + + switch event := event.User.(type) { + case SectorStart: + // TODO: check if state is clean + state.SectorID = event.id + state.Pieces = event.pieces + state.State = api.Packing + + case SectorRestart: + // noop + case SectorFatalError: + log.Errorf("Fatal error on sector %d: %+v", state.SectorID, event.error) + // TODO: Do we want to mark the state as unrecoverable? + // I feel like this should be a softer error, where the user would + // be able to send a retry event of some kind + return nil, nil + + // // TODO: Incoming + // TODO: for those - look at dealIDs matching chain + + // // + // Packing + + case SectorPacked: + // TODO: assert state + state.Pieces = append(state.Pieces, event.pieces...) + state.State = api.Unsealed + + // // Unsealed + + case SectorSealFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealFailed + + case SectorSealed: + state.CommD = event.commD + state.CommR = event.commR + state.Ticket = event.ticket + state.State = api.PreCommitting + + // // PreCommit + + case SectorPreCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.PreCommitFailed + case SectorPreCommitted: + state.PreCommitMessage = &event.message + state.State = api.PreCommitted + + case SectorSeedReady: + state.Seed = event.seed + state.State = api.Committing + + // // Commit + + case SectorSealCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealCommitFailed + case SectorCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealFailed + case SectorCommitted: + state.Proof = event.proof + state.State = api.CommitWait + case SectorProving: + state.State = api.Proving + + case SectorFaultReported: + state.FaultReportMsg = &event.reportMsg + state.State = api.FaultReported + case SectorFaultedFinal: + state.State = api.FaultedFinal + + // // Debug triggers + case SectorForceState: + state.State = event.state + } + } + + ///// + // Now decide what to do next + + /* + + * Empty + | | + | v + *<- Packing <- incoming + | | + | v + *<- Unsealed <--> SealFailed + | | + | v + * PreCommitting <--> PreCommitFailed + | | ^ + | v | + *<- PreCommitted ------/ + | ||| + | vvv v--> SealCommitFailed + *<- Committing + | | ^--> CommitFailed + | v ^ + *<- CommitWait ---/ + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ + + */ + + switch state.State { + // Happy path + case api.Packing: + return m.handlePacking, nil + case api.Unsealed: + return m.handleUnsealed, nil + case api.PreCommitting: + return m.handlePreCommitting, nil + case api.PreCommitted: + return m.handlePreCommitted, nil + case api.Committing: + return m.handleCommitting, nil + case api.CommitWait: + return m.handleCommitWait, nil + case api.Proving: + // TODO: track sector health / expiration + log.Infof("Proving sector %d", state.SectorID) + + // Handled failure modes + case api.SealFailed: + log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID) + case api.PreCommitFailed: + log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID) + case api.SealCommitFailed: + log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID) + case api.CommitFailed: + log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID) + + // Faults + case api.Faulty: + return m.handleFaulty, nil + case api.FaultReported: + return m.handleFaultReported, nil + + // Fatal errors + case api.UndefinedSectorState: + log.Error("sector update with undefined state!") + case api.FailedUnrecoverable: + log.Errorf("sector %d failed unrecoverably", state.SectorID) + default: + log.Errorf("unexpected sector update state: %d", state.State) + } + + return nil, nil +} + +func (m *Miner) restartSectors(ctx context.Context) error { + trackedSectors, err := m.ListSectors() + if err != nil { + log.Errorf("loading sector list: %+v", err) + } + + for _, sector := range trackedSectors { + if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil { + log.Errorf("restarting sector %d: %+v", sector.SectorID, err) + } + } + + // TODO: Grab on-chain sector set and diff with trackedSectors + + return nil +} + +func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { + return m.sectors.Send(id, SectorForceState{state}) +} diff --git a/storage/sector_states.go b/storage/sector_states.go index ffad0119e..bf3dc231f 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -5,31 +5,14 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/evtsm" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate - -func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) { - go func() { - update := cb(ctx, sector) - - if update == nil { - return // async - } - - select { - case m.sectorUpdated <- *update: - case <-m.stop: - } - }() -} - -func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handlePacking(ctx evtsm.Context, sector SectorInfo) error { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) var allocated uint64 @@ -40,51 +23,49 @@ func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpd ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) if allocated > ubytes { - return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)) + return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) } fillerSizes, err := fillersFromRem(ubytes - allocated) if err != nil { - return sector.upd().fatal(err) + return err } if len(fillerSizes) > 0 { log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) } - pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...) + pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...) if err != nil { - return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)) + return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } - return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) { - info.Pieces = append(info.Pieces, pieces...) - }) + return ctx.Send(SectorPacked{pieces: pieces}) } -func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handleUnsealed(ctx evtsm.Context, sector SectorInfo) error { log.Infow("performing sector replication...", "sector", sector.SectorID) - ticket, err := m.tktFn(ctx) + ticket, err := m.tktFn(ctx.Context()) if err != nil { - return sector.upd().fatal(err) + return err } rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { - return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err)) + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) } - return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { - info.CommD = rspco.CommD[:] - info.CommR = rspco.CommR[:] - info.Ticket = SealTicket{ + return ctx.Send(SectorSealed{ + commD: rspco.CommD[:], + commR: rspco.CommR[:], + ticket: SealTicket{ BlockHeight: ticket.BlockHeight, TicketBytes: ticket.TicketBytes[:], - } + }, }) } -func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handlePreCommitting(ctx evtsm.Context, sector SectorInfo) error { params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, @@ -94,7 +75,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec } enc, aerr := actors.SerializeParams(params) if aerr != nil { - return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) + return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) } msg := &types.Message{ @@ -108,54 +89,45 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec } log.Info("submitting precommit for sector: ", sector.SectorID) - smsg, err := m.api.MpoolPushMessage(ctx, msg) + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) if err != nil { - return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err)) + return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) { - mcid := smsg.Cid() - info.PreCommitMessage = &mcid - }) + return ctx.Send(SectorPreCommitted{message: smsg.Cid()}) } -func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handlePreCommitted(ctx evtsm.Context, sector SectorInfo) error { // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorID) - mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) if err != nil { - return sector.upd().to(api.PreCommitFailed).error(err) + return ctx.Send(SectorPreCommitFailed{err}) } if mw.Receipt.ExitCode != 0 { log.Error("sector precommit failed: ", mw.Receipt.ExitCode) err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) - return sector.upd().to(api.PreCommitFailed).error(err) + return ctx.Send(SectorPreCommitFailed{err}) } log.Info("precommit message landed on chain: ", sector.SectorID) randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight) - updateNonce := sector.Nonce - - err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error { - rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight)) + err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error { + rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight)) if err != nil { err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) - m.sectorUpdated <- *sector.upd().fatal(err) + ctx.Send(SectorFatalError{error: err}) return err } - m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) { - info.Seed = SealSeed{ - BlockHeight: randHeight, - TicketBytes: rand, - } - }) - - updateNonce++ + ctx.Send(SectorSeedReady{seed: SealSeed{ + BlockHeight: randHeight, + TicketBytes: rand, + }}) return nil }, func(ctx context.Context, ts *types.TipSet) error { @@ -170,12 +142,12 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect return nil } -func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handleCommitting(ctx evtsm.Context, sector SectorInfo) error { log.Info("scheduling seal proof computation...") proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { - return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err)) + return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } // TODO: Consider splitting states and persist proof for faster recovery @@ -188,7 +160,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector enc, aerr := actors.SerializeParams(params) if aerr != nil { - return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) } msg := &types.Message{ @@ -201,40 +173,37 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector GasPrice: types.NewInt(1), } - smsg, err := m.api.MpoolPushMessage(ctx, msg) + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) if err != nil { - return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err)) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - // TODO: Separate state before this wait, so we persist message cid? - return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) { - mcid := smsg.Cid() - info.CommitMessage = &mcid - info.Proof = proof + return ctx.Send(SectorCommitted{ + proof: proof, + message: smsg.Cid(), }) } -func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handleCommitWait(ctx evtsm.Context, sector SectorInfo) error { if sector.CommitMessage == nil { log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID) - return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("entered commit wait with no commit cid")) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")}) } - mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage) + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage) if err != nil { - return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) } if mw.Receipt.ExitCode != 0 { log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof) - return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode)) + return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode) } - return sector.upd().to(api.Proving).state(func(info *SectorInfo) { - }) + return ctx.Send(SectorProving{}) } -func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handleFaulty(ctx evtsm.Context, sector SectorInfo) error { // TODO: check if the fault has already been reported, and that this sector is even valid // TODO: coalesce faulty sector reporting @@ -245,7 +214,7 @@ func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpda _ = fp enc, aerr := actors.SerializeParams(nil) if aerr != nil { - return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr)) + return xerrors.Errorf("failed to serialize declare fault params: %w", aerr) } msg := &types.Message{ @@ -258,32 +227,28 @@ func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpda GasPrice: types.NewInt(1), } - smsg, err := m.api.MpoolPushMessage(ctx, msg) + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) if err != nil { - return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err)) + return xerrors.Errorf("failed to push declare faults message to network: %w", err) } - return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) { - c := smsg.Cid() - info.FaultReportMsg = &c - }) + return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()}) } -func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate { +func (m *Miner) handleFaultReported(ctx evtsm.Context, sector SectorInfo) error { if sector.FaultReportMsg == nil { - return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")) + return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid") } - mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg) + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg) if err != nil { - return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err)) + return xerrors.Errorf("failed to wait for fault declaration: %w", err) } if mw.Receipt.ExitCode != 0 { log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID) - return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)) + return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode) } - return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {}) - + return ctx.Send(SectorFaultedFinal{}) } diff --git a/storage/sector_types.go b/storage/sector_types.go index 3c328b9c0..ed2cbe6d5 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -76,10 +76,6 @@ type SectorInfo struct { LastErr string } -func (t *SectorInfo) upd() *sectorUpdate { - return §orUpdate{id: t.SectorID, nonce: t.Nonce} -} - func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) for i, piece := range t.Pieces { diff --git a/storage/sectors.go b/storage/sectors.go index 0a322164d..24e3dfac8 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -2,275 +2,13 @@ package storage import ( "context" - "fmt" - "io" - "math" - xerrors "golang.org/x/xerrors" + "io" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -const NonceIncrement = math.MaxUint64 - -type sectorUpdate struct { - newState api.SectorState - id uint64 - err error - nonce uint64 - mut func(*SectorInfo) -} - -func (u *sectorUpdate) fatal(err error) *sectorUpdate { - u.newState = api.FailedUnrecoverable - u.err = err - return u -} - -func (u *sectorUpdate) error(err error) *sectorUpdate { - u.err = err - return u -} - -func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { - u.mut = m - return u -} - -func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate { - u.newState = newState - return u -} - -func (u *sectorUpdate) setNonce(nc uint64) *sectorUpdate { - u.nonce = nc - return u -} - -func (m *Miner) UpdateSectorState(ctx context.Context, sector uint64, snonce uint64, state api.SectorState) error { - select { - case m.sectorUpdated <- sectorUpdate{ - newState: state, - nonce: snonce, - id: sector, - }: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (m *Miner) sectorStateLoop(ctx context.Context) error { - trackedSectors, err := m.ListSectors() - if err != nil { - log.Errorf("loading sector list: %+v", err) - } - - go func() { - for _, si := range trackedSectors { - select { - case m.sectorUpdated <- sectorUpdate{ - newState: si.State, - nonce: si.Nonce, - id: si.SectorID, - err: nil, - mut: nil, - }: - case <-ctx.Done(): - log.Warn("didn't restart processing for all sectors: ", ctx.Err()) - return - } - } - }() - - { - // verify on-chain state - trackedByID := map[uint64]*SectorInfo{} - for _, si := range trackedSectors { - i := si - trackedByID[si.SectorID] = &i - } - - curTs, err := m.api.ChainHead(ctx) - if err != nil { - return xerrors.Errorf("getting chain head: %w", err) - } - - ps, err := m.api.StateMinerProvingSet(ctx, m.maddr, curTs) - if err != nil { - return xerrors.Errorf("getting miner proving set: %w", err) - } - for _, ocs := range ps { - if _, ok := trackedByID[ocs.SectorID]; ok { - continue // TODO: check state - } - - // TODO: attempt recovery - log.Warnf("untracked sector %d found on chain", ocs.SectorID) - } - } - - go func() { - defer log.Warn("quitting deal provider loop") - defer close(m.stopped) - - for { - select { - case sector := <-m.sectorIncoming: - m.onSectorIncoming(sector) - case update := <-m.sectorUpdated: - m.onSectorUpdated(ctx, update) - case <-m.stop: - return - } - } - }() - - return nil -} - -func (m *Miner) onSectorIncoming(sector *SectorInfo) { - has, err := m.sectors.Has(sector.SectorID) - if err != nil { - return - } - if has { - log.Warnf("SealPiece called more than once for sector %d", sector.SectorID) - return - } - - if err := m.sectors.Begin(sector.SectorID, sector); err != nil { - log.Errorf("sector tracking failed: %s", err) - return - } - - go func() { - select { - case m.sectorUpdated <- sectorUpdate{ - newState: api.Packing, - id: sector.SectorID, - }: - case <-m.stop: - log.Warn("failed to send incoming sector update, miner shutting down") - } - }() -} - -func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { - log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState]) - var sector SectorInfo - err := m.sectors.Get(update.id).Mutate(func(s *SectorInfo) error { - if update.nonce < s.Nonce { - return xerrors.Errorf("update nonce too low, ignoring (%d < %d)", update.nonce, s.Nonce) - } - - if update.nonce != NonceIncrement { - s.Nonce = update.nonce - } else { - s.Nonce++ // forced update - } - s.State = update.newState - if update.err != nil { - if s.LastErr != "" { - s.LastErr += "---------\n\n" - } - s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err) - } - - if update.mut != nil { - update.mut(s) - } - sector = *s - return nil - }) - if update.err != nil { - log.Errorf("sector %d failed: %+v", update.id, update.err) - } - if err != nil { - log.Errorf("sector %d update error: %+v", update.id, err) - return - } - - /* - - * Empty - | | - | v - *<- Packing <- incoming - | | - | v - *<- Unsealed <--> SealFailed - | | - | v - * PreCommitting <--> PreCommitFailed - | | ^ - | v | - *<- PreCommitted ------/ - | ||| - | vvv v--> SealCommitFailed - *<- Committing - | | ^--> CommitFailed - | v ^ - *<- CommitWait ---/ - | | - | v - *<- Proving - | - v - FailedUnrecoverable - - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ - - */ - - switch update.newState { - // Happy path - case api.Packing: - m.handleSectorUpdate(ctx, sector, m.handlePacking) - case api.Unsealed: - m.handleSectorUpdate(ctx, sector, m.handleUnsealed) - case api.PreCommitting: - m.handleSectorUpdate(ctx, sector, m.handlePreCommitting) - case api.PreCommitted: - m.handleSectorUpdate(ctx, sector, m.handlePreCommitted) - case api.Committing: - m.handleSectorUpdate(ctx, sector, m.handleCommitting) - case api.CommitWait: - m.handleSectorUpdate(ctx, sector, m.handleCommitWait) - case api.Proving: - // TODO: track sector health / expiration - log.Infof("Proving sector %d", update.id) - - // Handled failure modes - case api.SealFailed: - log.Warnf("sector %d entered unimplemented state 'SealFailed'", update.id) - case api.PreCommitFailed: - log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", update.id) - case api.SealCommitFailed: - log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", update.id) - case api.CommitFailed: - log.Warnf("sector %d entered unimplemented state 'CommitFailed'", update.id) - - // Faults - case api.Faulty: - m.handleSectorUpdate(ctx, sector, m.handleFaulty) - case api.FaultReported: - m.handleSectorUpdate(ctx, sector, m.handleFaultReported) - - // Fatal errors - case api.UndefinedSectorState: - log.Error("sector update with undefined state!") - case api.FailedUnrecoverable: - log.Errorf("sector %d failed unrecoverably", update.id) - default: - log.Errorf("unexpected sector update state: %d", update.newState) - } -} - func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { if padreader.PaddedSize(size) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") @@ -297,10 +35,9 @@ func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorI } func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { - si := &SectorInfo{ - SectorID: sid, - - Pieces: []Piece{ + return m.sectors.Send(sid, SectorStart{ + id: sid, + pieces: []Piece{ { DealID: dealID, @@ -308,11 +45,5 @@ func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi se CommP: ppi.CommP[:], }, }, - } - select { - case m.sectorIncoming <- si: - return nil - case <-ctx.Done(): - return xerrors.Errorf("failed to submit sector for sealing, queue full: %w", ctx.Err()) - } + }) }