diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a34841c71..dfeec2398 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -104,15 +104,23 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h return nil, err } - sm, err := storage.NewMiner(api, maddr, h, ds, sb, tktFn) + ctx := helpers.LifecycleCtx(mctx, lc) + + worker, err := api.StateMinerWorker(ctx, maddr, nil) if err != nil { return nil, err } - ctx := helpers.LifecycleCtx(mctx, lc) + fps := storage.NewFPoStScheduler(api, sb, maddr, worker) + + sm, err := storage.NewMiner(api, maddr, worker, h, ds, sb, tktFn) + if err != nil { + return nil, err + } lc.Append(fx.Hook{ OnStart: func(context.Context) error { + go fps.Run(ctx) return sm.Run(ctx) }, OnStop: sm.Stop, diff --git a/storage/fpost_run.go b/storage/fpost_run.go index 1e23bb175..ca5321513 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -14,7 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (s *fpostScheduler) failPost(eps uint64) { +func (s *FPoStScheduler) failPost(eps uint64) { s.failLk.Lock() if eps > s.failed { s.failed = eps @@ -22,7 +22,7 @@ func (s *fpostScheduler) failPost(eps uint64) { s.failLk.Unlock() } -func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) { +func (s *FPoStScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) { ctx, abort := context.WithCancel(ctx) s.abort = abort @@ -31,7 +31,7 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe go func() { defer abort() - ctx, span := trace.StartSpan(ctx, "fpostScheduler.doPost") + ctx, span := trace.StartSpan(ctx, "FPoStScheduler.doPost") defer span.End() proof, err := s.runPost(ctx, eps, ts) @@ -50,7 +50,7 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe }() } -func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) { +func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) { faults := s.sb.Scrub(ssi) var faultIDs []uint64 @@ -101,7 +101,7 @@ func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.Sort return faultIDs, nil } -func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) { +func (s *FPoStScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) { ctx, span := trace.StartSpan(ctx, "storage.runPost") defer span.End() @@ -161,7 +161,7 @@ func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipS }, nil } -func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) { +func (s *FPoStScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) { sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts) if err != nil { return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err) @@ -184,7 +184,7 @@ func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil } -func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error { +func (s *FPoStScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error { ctx, span := trace.StartSpan(ctx, "storage.commitPost") defer span.End() diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go index b6a59a40e..9fa427677 100644 --- a/storage/fpost_sched.go +++ b/storage/fpost_sched.go @@ -19,7 +19,7 @@ const Inactive = 0 const StartConfidence = 4 // TODO: config -type fpostScheduler struct { +type FPoStScheduler struct { api storageMinerApi sb sectorbuilder.Interface @@ -36,7 +36,11 @@ type fpostScheduler struct { failLk sync.Mutex } -func (s *fpostScheduler) run(ctx context.Context) { +func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Interface, actor address.Address, worker address.Address) *FPoStScheduler { + return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker} +} + +func (s *FPoStScheduler) Run(ctx context.Context) { notifs, err := s.api.ChainNotify(ctx) if err != nil { return @@ -61,11 +65,11 @@ func (s *fpostScheduler) run(ctx context.Context) { select { case changes, ok := <-notifs: if !ok { - log.Warn("fpostScheduler notifs channel closed") + log.Warn("FPoStScheduler notifs channel closed") return } - ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange") + ctx, span := trace.StartSpan(ctx, "FPoStScheduler.headChange") var lowest, highest *types.TipSet = s.cur, nil @@ -95,7 +99,7 @@ func (s *fpostScheduler) run(ctx context.Context) { } } -func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { +func (s *FPoStScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { if s.cur == newLowest { return nil } @@ -113,9 +117,9 @@ func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) er return nil } -func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error { +func (s *FPoStScheduler) update(ctx context.Context, new *types.TipSet) error { if new == nil { - return xerrors.Errorf("no new tipset in fpostScheduler.update") + return xerrors.Errorf("no new tipset in FPoStScheduler.update") } newEPS, start, err := s.shouldFallbackPost(ctx, new) if err != nil { @@ -142,7 +146,7 @@ func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error { return nil } -func (s *fpostScheduler) abortActivePoSt() { +func (s *FPoStScheduler) abortActivePoSt() { if s.activeEPS == Inactive { return // noop } @@ -157,7 +161,7 @@ func (s *fpostScheduler) abortActivePoSt() { s.abort = nil } -func (s *fpostScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) { +func (s *FPoStScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) { eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts) if err != nil { return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err) diff --git a/storage/miner.go b/storage/miner.go index 06eb2c47c..9fc26d1b5 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -65,7 +65,7 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) { +func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) { m := &Miner{ api: api, h: h, @@ -73,7 +73,8 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto ds: ds, tktFn: tktFn, - maddr: addr, + maddr: maddr, + worker: worker, } return m, nil @@ -84,16 +85,6 @@ func (m *Miner) Run(ctx context.Context) error { return xerrors.Errorf("miner preflight checks failed: %w", err) } - fps := &fpostScheduler{ - api: m.api, - sb: m.sb, - - actor: m.maddr, - worker: m.worker, - } - - go fps.run(ctx) - evts := events.NewEvents(ctx, m.api) m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn) @@ -108,14 +99,7 @@ func (m *Miner) Stop(ctx context.Context) error { } func (m *Miner) runPreflightChecks(ctx context.Context) error { - worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil) - if err != nil { - return err - } - - m.worker = worker - - has, err := m.api.WalletHas(ctx, worker) + has, err := m.api.WalletHas(ctx, m.worker) if err != nil { return xerrors.Errorf("failed to check wallet for worker key: %w", err) }