split FPoStScheduler from Miner
Rationale: Nodes want to handle scheduling of PoSts. Splitting the FPoStScheduler from the Miner allows nodes to reuse the Miner w/out having to reuse the PoSt scheduler.
This commit is contained in:
parent
c9bd682c33
commit
ea2f53745e
@ -104,15 +104,21 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm, err := storage.NewMiner(api, maddr, h, ds, sb, tktFn)
|
worker, err := api.StateMinerWorker(helpers.LifecycleCtx(mctx, lc), maddr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
fps := storage.NewFPoStScheduler(api, sb, maddr, worker)
|
||||||
|
|
||||||
|
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sb, tktFn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
|
go fps.Run(ctx)
|
||||||
return sm.Run(ctx)
|
return sm.Run(ctx)
|
||||||
},
|
},
|
||||||
OnStop: sm.Stop,
|
OnStop: sm.Stop,
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *fpostScheduler) failPost(eps uint64) {
|
func (s *FPoStScheduler) failPost(eps uint64) {
|
||||||
s.failLk.Lock()
|
s.failLk.Lock()
|
||||||
if eps > s.failed {
|
if eps > s.failed {
|
||||||
s.failed = eps
|
s.failed = eps
|
||||||
@ -22,7 +22,7 @@ func (s *fpostScheduler) failPost(eps uint64) {
|
|||||||
s.failLk.Unlock()
|
s.failLk.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) {
|
func (s *FPoStScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) {
|
||||||
ctx, abort := context.WithCancel(ctx)
|
ctx, abort := context.WithCancel(ctx)
|
||||||
|
|
||||||
s.abort = abort
|
s.abort = abort
|
||||||
@ -31,7 +31,7 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe
|
|||||||
go func() {
|
go func() {
|
||||||
defer abort()
|
defer abort()
|
||||||
|
|
||||||
ctx, span := trace.StartSpan(ctx, "fpostScheduler.doPost")
|
ctx, span := trace.StartSpan(ctx, "FPoStScheduler.doPost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
proof, err := s.runPost(ctx, eps, ts)
|
proof, err := s.runPost(ctx, eps, ts)
|
||||||
@ -50,7 +50,7 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) {
|
func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) {
|
||||||
faults := s.sb.Scrub(ssi)
|
faults := s.sb.Scrub(ssi)
|
||||||
var faultIDs []uint64
|
var faultIDs []uint64
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.Sort
|
|||||||
return faultIDs, nil
|
return faultIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) {
|
func (s *FPoStScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "storage.runPost")
|
ctx, span := trace.StartSpan(ctx, "storage.runPost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@ -161,7 +161,7 @@ func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipS
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) {
|
func (s *FPoStScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) {
|
||||||
sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts)
|
sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err)
|
return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err)
|
||||||
@ -184,7 +184,7 @@ func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet)
|
|||||||
return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil
|
return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error {
|
func (s *FPoStScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
|
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ const Inactive = 0
|
|||||||
|
|
||||||
const StartConfidence = 4 // TODO: config
|
const StartConfidence = 4 // TODO: config
|
||||||
|
|
||||||
type fpostScheduler struct {
|
type FPoStScheduler struct {
|
||||||
api storageMinerApi
|
api storageMinerApi
|
||||||
sb sectorbuilder.Interface
|
sb sectorbuilder.Interface
|
||||||
|
|
||||||
@ -36,7 +36,11 @@ type fpostScheduler struct {
|
|||||||
failLk sync.Mutex
|
failLk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) run(ctx context.Context) {
|
func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Interface, actor address.Address, worker address.Address) *FPoStScheduler {
|
||||||
|
return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FPoStScheduler) Run(ctx context.Context) {
|
||||||
notifs, err := s.api.ChainNotify(ctx)
|
notifs, err := s.api.ChainNotify(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -61,11 +65,11 @@ func (s *fpostScheduler) run(ctx context.Context) {
|
|||||||
select {
|
select {
|
||||||
case changes, ok := <-notifs:
|
case changes, ok := <-notifs:
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("fpostScheduler notifs channel closed")
|
log.Warn("FPoStScheduler notifs channel closed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange")
|
ctx, span := trace.StartSpan(ctx, "FPoStScheduler.headChange")
|
||||||
|
|
||||||
var lowest, highest *types.TipSet = s.cur, nil
|
var lowest, highest *types.TipSet = s.cur, nil
|
||||||
|
|
||||||
@ -95,7 +99,7 @@ func (s *fpostScheduler) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) error {
|
func (s *FPoStScheduler) revert(ctx context.Context, newLowest *types.TipSet) error {
|
||||||
if s.cur == newLowest {
|
if s.cur == newLowest {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -113,9 +117,9 @@ func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error {
|
func (s *FPoStScheduler) update(ctx context.Context, new *types.TipSet) error {
|
||||||
if new == nil {
|
if new == nil {
|
||||||
return xerrors.Errorf("no new tipset in fpostScheduler.update")
|
return xerrors.Errorf("no new tipset in FPoStScheduler.update")
|
||||||
}
|
}
|
||||||
newEPS, start, err := s.shouldFallbackPost(ctx, new)
|
newEPS, start, err := s.shouldFallbackPost(ctx, new)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -142,7 +146,7 @@ func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) abortActivePoSt() {
|
func (s *FPoStScheduler) abortActivePoSt() {
|
||||||
if s.activeEPS == Inactive {
|
if s.activeEPS == Inactive {
|
||||||
return // noop
|
return // noop
|
||||||
}
|
}
|
||||||
@ -157,7 +161,7 @@ func (s *fpostScheduler) abortActivePoSt() {
|
|||||||
s.abort = nil
|
s.abort = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fpostScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) {
|
func (s *FPoStScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) {
|
||||||
eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts)
|
eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err)
|
return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err)
|
||||||
|
@ -65,7 +65,7 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) {
|
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) {
|
||||||
m := &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
h: h,
|
h: h,
|
||||||
@ -73,7 +73,8 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
|
|||||||
ds: ds,
|
ds: ds,
|
||||||
tktFn: tktFn,
|
tktFn: tktFn,
|
||||||
|
|
||||||
maddr: addr,
|
maddr: maddr,
|
||||||
|
worker: worker,
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
@ -84,16 +85,6 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fps := &fpostScheduler{
|
|
||||||
api: m.api,
|
|
||||||
sb: m.sb,
|
|
||||||
|
|
||||||
actor: m.maddr,
|
|
||||||
worker: m.worker,
|
|
||||||
}
|
|
||||||
|
|
||||||
go fps.run(ctx)
|
|
||||||
|
|
||||||
evts := events.NewEvents(ctx, m.api)
|
evts := events.NewEvents(ctx, m.api)
|
||||||
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
|
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
|
||||||
|
|
||||||
@ -108,14 +99,7 @@ func (m *Miner) Stop(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) runPreflightChecks(ctx context.Context) error {
|
func (m *Miner) runPreflightChecks(ctx context.Context) error {
|
||||||
worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil)
|
has, err := m.api.WalletHas(ctx, m.worker)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
m.worker = worker
|
|
||||||
|
|
||||||
has, err := m.api.WalletHas(ctx, worker)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to check wallet for worker key: %w", err)
|
return xerrors.Errorf("failed to check wallet for worker key: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user