package storage import ( "context" "sync" "go.opencensus.io/trace" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) const Inactive = 0 const StartConfidence = 4 // TODO: config type FPoStScheduler struct { api storageMinerApi sb storage.Prover actor address.Address worker address.Address cur *types.TipSet // if a post is in progress, this indicates for which ElectionPeriodStart activeEPS abi.ChainEpoch abort context.CancelFunc failed abi.ChainEpoch // eps failLk sync.Mutex } func NewFPoStScheduler(api storageMinerApi, sb storage.Prover, actor address.Address, worker address.Address) *FPoStScheduler { return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker} } func (s *FPoStScheduler) Run(ctx context.Context) { notifs, err := s.api.ChainNotify(ctx) if err != nil { return } current := <-notifs if len(current) != 1 { panic("expected first notif to have len = 1") } if current[0].Type != store.HCCurrent { panic("expected first notif to tell current ts") } if err := s.update(ctx, current[0].Val); err != nil { panic(err) } defer s.abortActivePoSt() // not fine to panic after this point for { select { case changes, ok := <-notifs: if !ok { log.Warn("FPoStScheduler notifs channel closed") return } ctx, span := trace.StartSpan(ctx, "FPoStScheduler.headChange") var lowest, highest *types.TipSet = s.cur, nil for _, change := range changes { if change.Val == nil { log.Errorf("change.Val was nil") } switch change.Type { case store.HCRevert: lowest = change.Val case store.HCApply: highest = change.Val } } if err := s.revert(ctx, lowest); err != nil { log.Error("handling head reverts in fallbackPost sched: %+v", err) } if err := s.update(ctx, highest); err != nil { log.Error("handling head updates in fallbackPost sched: %+v", err) } span.End() case <-ctx.Done(): return } } } func (s *FPoStScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { if s.cur == newLowest { return nil } s.cur = newLowest newEPS, _, err := s.shouldFallbackPost(ctx, newLowest) if err != nil { return err } if newEPS != s.activeEPS { s.abortActivePoSt() } return nil } func (s *FPoStScheduler) update(ctx context.Context, new *types.TipSet) error { if new == nil { return xerrors.Errorf("no new tipset in FPoStScheduler.update") } newEPS, start, err := s.shouldFallbackPost(ctx, new) if err != nil { return err } s.failLk.Lock() if s.failed > 0 { s.failed = 0 s.activeEPS = 0 } s.failLk.Unlock() if newEPS == s.activeEPS { return nil } s.abortActivePoSt() if newEPS != Inactive && start { s.doPost(ctx, newEPS, new) } return nil } func (s *FPoStScheduler) abortActivePoSt() { if s.activeEPS == Inactive { return // noop } if s.abort != nil { s.abort() } log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS) s.activeEPS = Inactive s.abort = nil } func (s *FPoStScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (abi.ChainEpoch, bool, error) { ps, err := s.api.StateMinerPostState(ctx, s.actor, ts.Key()) if err != nil { return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err) } if ts.Height() >= ps.ProvingPeriodStart+build.FallbackPoStDelay { return ps.ProvingPeriodStart, ts.Height() >= ps.ProvingPeriodStart+build.FallbackPoStDelay+StartConfidence, nil } return 0, false, nil }