From 920fd3ba9bdbdc1ad8057ad138d8583c27cc5311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Nov 2019 18:44:49 +0100 Subject: [PATCH] storageminer: New fallback post scheduler --- api/api_full.go | 2 +- api/struct.go | 2 +- build/params_shared.go | 4 +- chain/actors/actor_miner.go | 6 +- chain/actors/cbor_gen.go | 11 +- chain/stmgr/utils.go | 5 +- cmd/lotus-storage-miner/info.go | 2 +- gen/main.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 9 +- node/impl/full/state.go | 4 +- storage/fpost_run.go | 138 +++++++++++++ storage/fpost_sched.go | 141 ++++++++++++++ storage/miner.go | 11 +- storage/post.go | 298 ----------------------------- storage/sector_states.go | 2 - 15 files changed, 308 insertions(+), 329 deletions(-) create mode 100644 storage/fpost_run.go create mode 100644 storage/fpost_sched.go delete mode 100644 storage/post.go diff --git a/api/api_full.go b/api/api_full.go index fc23cf43b..2e0b81d1a 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -99,7 +99,7 @@ type FullNode interface { StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) - StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) diff --git a/api/struct.go b/api/struct.go index 1356bff2f..e8f27680f 100644 --- a/api/struct.go +++ b/api/struct.go @@ -362,7 +362,7 @@ func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address return c.Internal.StateMinerPeerID(ctx, m, ts) } -func (c *FullNodeStruct) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { +func (c *FullNodeStruct) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts) } diff --git a/build/params_shared.go b/build/params_shared.go index c101d8db9..e482ca9eb 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -75,11 +75,11 @@ const SectorChallengeRatioDiv = 25 const MaxFallbackPostChallengeCount = 10 -// FallbackPoStBegin is the number of epochs the miner needs to wait after +// FallbackPoStDelay is the number of epochs the miner needs to wait after // ElectionPeriodStart before starting fallback post computation // // Epochs -const FallbackPoStBegin = 1000 +const FallbackPoStDelay = 1000 // SlashablePowerDelay is the number of epochs // Epochs diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index b1fe46fb7..1de5ae2f1 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -389,12 +389,12 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC return nil, err } -type SubmitPoStParams struct { +type SubmitFallbackPoStParams struct { Proof []byte Candidates []types.EPostTicket } -func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) { +func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitFallbackPoStParams) ([]byte, ActorError) { oldstate, self, err := loadState(vmctx) if err != nil { return nil, err @@ -427,7 +427,7 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM var seed [sectorbuilder.CommLen]byte { - randHeight := self.ElectionPeriodStart + build.FallbackPoStBegin + randHeight := self.ElectionPeriodStart + build.FallbackPoStDelay if vmctx.BlockHeight() <= randHeight { // TODO: spec, retcode return nil, aerrors.Newf(1, "submit fallback PoSt called too early (%d < %d)", vmctx.BlockHeight(), randHeight) diff --git a/chain/actors/cbor_gen.go b/chain/actors/cbor_gen.go index 8a8688208..1666adeb2 100644 --- a/chain/actors/cbor_gen.go +++ b/chain/actors/cbor_gen.go @@ -813,7 +813,7 @@ func (t *MinerInfo) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error { +func (t *SubmitFallbackPoStParams) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -823,13 +823,10 @@ func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error { } // t.t.Proof (types.EPostProof) (struct) - if err := t.Proof.MarshalCBOR(w); err != nil { - return err - } return nil } -func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error { +func (t *SubmitFallbackPoStParams) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -848,10 +845,6 @@ func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error { { - if err := t.Proof.UnmarshalCBOR(br); err != nil { - return err - } - } return nil } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 1a1a46ad0..a8c5378c1 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -149,15 +149,14 @@ func GetMinerWorker(ctx context.Context, sm *StateManager, ts *types.TipSet, mad return address.NewFromBytes(recp.Return) } -func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) { +func GetMinerElectionPeriodStart(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) { var mas actors.StorageMinerActorState _, err := sm.LoadActorState(ctx, maddr, &mas, ts) if err != nil { return 0, xerrors.Errorf("failed to load miner actor state: %w", err) } - panic("idk what to do") - //return mas.ProvingPeriodEnd, nil + return mas.ElectionPeriodStart, nil } func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.ChainSectorInfo, error) { diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index b1df59cb3..900a7c611 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -60,7 +60,7 @@ var infoCmd = &cli.Command{ } fmt.Printf("Worker use: %d / %d (+%d)\n", wstat.Total-wstat.Reserved-wstat.Free, wstat.Total, wstat.Reserved) - ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) + ppe, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil) if err != nil { return err } diff --git a/gen/main.go b/gen/main.go index 48668d6da..4aad7622c 100644 --- a/gen/main.go +++ b/gen/main.go @@ -93,7 +93,7 @@ func main() { actors.SectorPreCommitInfo{}, actors.PreCommittedSector{}, actors.MinerInfo{}, - actors.SubmitPoStParams{}, + actors.SubmitFallbackPoStParams{}, actors.PaymentVerifyParams{}, actors.UpdatePeerIDParams{}, actors.MultiSigActorState{}, diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 156bf4389..f953dea14 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -328,10 +328,10 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (Sor return NewSortedPrivateSectorInfo(out), nil } -func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { +func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) { privsectors, err := sb.pubSectorToPriv(sectorInfo) if err != nil { - return nil, err + return nil, nil, err } challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values()))) @@ -339,10 +339,11 @@ func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, proverID := addressToProverID(sb.Miner) candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) if err != nil { - return nil, err + return nil, nil, err } - return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) + proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) + return candidates, proof, err } var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector diff --git a/node/impl/full/state.go b/node/impl/full/state.go index a51276d6c..04037c079 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -76,8 +76,8 @@ func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts * return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m) } -func (a *StateAPI) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { - return stmgr.GetMinerProvingPeriodEnd(ctx, a.StateManager, ts, actor) +func (a *StateAPI) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { + return stmgr.GetMinerElectionPeriodStart(ctx, a.StateManager, ts, actor) } func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { diff --git a/storage/fpost_run.go b/storage/fpost_run.go new file mode 100644 index 000000000..ba0990ef3 --- /dev/null +++ b/storage/fpost_run.go @@ -0,0 +1,138 @@ +package storage + +import ( + "context" + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "go.opencensus.io/trace" + "golang.org/x/xerrors" + "time" +) + +func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) { + ctx, abort := context.WithCancel(ctx) + + s.abort = abort + s.activeEPS = eps + + go func() { + defer abort() + + ctx, span := trace.StartSpan(ctx, "fpostScheduler.doPost") + defer span.End() + + proof, err := s.runPost(ctx, eps, ts) + if err != nil { + log.Errorf("runPost failed: %+v", err) + return + } + + if err := s.submitPost(ctx, proof); err != nil { + log.Errorf("submitPost failed: %+v", err) + return + } + }() +} + +func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) { + ctx, span := trace.StartSpan(ctx, "storage.runPost") + defer span.End() + + challengeRound := int64(eps + build.FallbackPoStDelay) + + rand, err := s.api.ChainGetRandomness(ctx, ts.Key(), challengeRound) + if err != nil { + return nil, xerrors.Errorf("failed to get chain randomness for fpost (ts=%d; eps=%d): %w", ts.Height(), eps, err) + } + + ssi, err := s.sortedSectorInfo(ctx, ts) + if err != nil { + return nil, xerrors.Errorf("getting sorted sector info: %w", err) + } + + log.Infow("running fPoSt", "chain-random", rand, "eps", eps, "height", ts.Height()) + + tsStart := time.Now() + var faults []uint64 // TODO + + var seed [32]byte + copy(seed[:], rand) + + scandidates, proof, err := s.sb.GenerateFallbackPoSt(ssi, seed, faults) + if err != nil { + return nil, xerrors.Errorf("running post failed: %w", err) + } + + elapsed := time.Since(tsStart) + log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed) + + candidates := make([]types.EPostTicket, len(scandidates)) + for i, sc := range scandidates { + candidates[i] = types.EPostTicket{ + Partial: sc.PartialTicket[:], + SectorID: sc.SectorID, + ChallengeIndex: sc.SectorChallengeIndex, + } + } + + return &actors.SubmitFallbackPoStParams{ + Proof: proof, + Candidates: candidates, + }, nil +} + +func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) { + sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts) + if err != nil { + return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err) + } + if len(sset) == 0 { + log.Warn("empty proving set! (ts.H: %d)", ts.Height()) + } + + sbsi := make([]ffi.PublicSectorInfo, len(sset)) + for k, sector := range sset { + var commR [sectorbuilder.CommLen]byte + copy(commR[:], sector.CommR) + + sbsi[k] = ffi.PublicSectorInfo{ + SectorID: sector.SectorID, + CommR: commR, + } + } + + return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil +} + +func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error { + ctx, span := trace.StartSpan(ctx, "storage.commitPost") + defer span.End() + + enc, aerr := actors.SerializeParams(proof) + if aerr != nil { + return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + From: s.worker, + Method: actors.MAMethods.SubmitFallbackPoSt, + Params: enc, + Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late + GasLimit: types.NewInt(10000000), // i dont know help + GasPrice: types.NewInt(1), + } + + // TODO: consider maybe caring about the output + sm, err := s.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing message to mpool: %w", err) + } + + log.Infof("Submitted fallback post: %s", sm.Cid()) + + return nil +} diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go new file mode 100644 index 000000000..565a3604f --- /dev/null +++ b/storage/fpost_sched.go @@ -0,0 +1,141 @@ +package storage + +import ( + "context" + + "go.opencensus.io/trace" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" +) + +const Inactive = 0 + +const StartConfidence = 4 // TODO: config + +type fpostScheduler struct { + api storageMinerApi + sb *sectorbuilder.SectorBuilder + + actor address.Address + worker address.Address + + cur *types.TipSet + + // if a post is in progress, this indicates for which ElectionPeriodStart + activeEPS uint64 + abort context.CancelFunc +} + +func (s *fpostScheduler) run(ctx context.Context) { + notifs, err := s.api.ChainNotify(ctx) + if err != nil { + return + } + + current := <-notifs + if len(current) != 1 { + panic("expected first notif to have len = 1") + } + if current[0].Type == store.HCCurrent { + panic("expected first notif to tell current ts") + } + + if err := s.update(ctx, current[0].Val); err != nil { + panic(err) + } + + // not fine to panic after this point + for { + select { + case changes := <-notifs: + ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange") + + var lowest, highest *types.TipSet = s.cur, nil + + for _, change := range changes { + switch change.Type { + case store.HCRevert: + lowest = change.Val + case store.HCApply: + highest = change.Val + } + } + + if err := s.revert(ctx, lowest); err != nil { + log.Error("handling head reverts in fallbackPost sched: %+v", err) + } + if err := s.update(ctx, highest); err != nil { + log.Error("handling head updates in fallbackPost sched: %+v", err) + } + + span.End() + } + } +} + +func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { + if s.cur == newLowest { + return nil + } + s.cur = newLowest + + newEPS, _, err := s.shouldFallbackPost(ctx, newLowest) + if err != nil { + return err + } + + if newEPS != s.activeEPS { + s.abortActivePoSt() + } + + return nil +} + +func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error { + newEPS, start, err := s.shouldFallbackPost(ctx, new) + if err != nil { + return err + } + + if newEPS != s.activeEPS { + s.abortActivePoSt() + } + + if newEPS != Inactive && start { + s.doPost(ctx, newEPS, new) + } + + return nil +} + +func (s *fpostScheduler) abortActivePoSt() { + if s.activeEPS == Inactive { + return // noop + } + + if s.abort != nil { + s.abort() + } + + log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS) + + s.activeEPS = 0 + s.abort = nil +} + +func (s *fpostScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) { + eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts) + if err != nil { + return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err) + } + + if ts.Height() >= eps+build.FallbackPoStDelay { + return eps, ts.Height() >= eps+build.FallbackPoStDelay+StartConfidence, nil + } + return 0, false, nil +} diff --git a/storage/miner.go b/storage/miner.go index 206bbbe4d..15f65e60c 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -53,7 +53,7 @@ type storageMinerApi interface { // Call a read only method on actors (no interaction with the chain required) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) - StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) @@ -99,7 +99,14 @@ func (m *Miner) Run(ctx context.Context) error { m.events = events.NewEvents(ctx, m.api) - go m.beginPosting(ctx) + fps := &fpostScheduler{ + api: m.api, + sb: m.sb, + actor: m.maddr, + worker: m.worker, + } + + go fps.run(ctx) go m.sectorStateLoop(ctx) return nil } diff --git a/storage/post.go b/storage/post.go deleted file mode 100644 index 541d23b25..000000000 --- a/storage/post.go +++ /dev/null @@ -1,298 +0,0 @@ -package storage - -import ( - "context" - "time" - - ffi "github.com/filecoin-project/filecoin-ffi" - - "go.opencensus.io/trace" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" -) - -const postMsgTimeout = 20 - -func (m *Miner) beginPosting(ctx context.Context) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - return - } - - sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) - if err != nil { - log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err) - return - } - - if sppe == 0 { - log.Warn("Not proving yet") - return - } - - m.postLk.Lock() - if m.schedPost > 0 { - log.Warnf("PoSts already running %d", m.schedPost) - m.postLk.Unlock() - return - } - - // height needs to be +1, because otherwise we'd be trying to schedule PoSt - // at current block height - ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1) - m.schedPost = ppe - - m.postLk.Unlock() - - if build.PoStChallangeTime > ppe { - ppe = build.PoStChallangeTime - } - - log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe) - err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoStChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %s", err) - return - } -} - -func (m *Miner) scheduleNextPost(ppe uint64) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - // TODO: retry - return - } - - headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height()) - if headPPE > ppe { - log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe) - ppe = headPPE - } - - m.postLk.Lock() - if m.schedPost >= ppe { - // this probably can't happen - log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe) - m.postLk.Unlock() - return - } - - m.schedPost = ppe - m.postLk.Unlock() - - log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime, - "height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod) - err = m.events.ChainAt(m.computePost(ppe), func(ctx context.Context, ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoStChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %+v", err) - return - } -} - -type post struct { - m *Miner - - ppe uint64 - ts *types.TipSet - - // prep - sset []*api.ChainSectorInfo - r []byte - - // run - proof []byte - - // commit - smsg *types.SignedMessage -} - -func (p *post) doPost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.computePost") - defer span.End() - - if err := p.preparePost(ctx); err != nil { - return xerrors.Errorf("prepare: %w", err) - } - - if err := p.runPost(ctx); err != nil { - return xerrors.Errorf("run: %w", err) - } - - if err := p.commitPost(ctx); err != nil { - return xerrors.Errorf("commit: %w", err) - } - - if err := p.waitCommit(ctx); err != nil { - return xerrors.Errorf("wait: %w", err) - } - - return nil -} - -func (p *post) preparePost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.preparePost") - defer span.End() - log.Info("preparePost") - - sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts) - if err != nil { - return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err) - } - if len(sset) == 0 { - log.Warn("empty proving set! (ts.H: %d)", p.ts.Height()) - } - - p.sset = sset - - challengeRound := int64(p.ppe) - int64(build.PoStChallangeTime+build.PoStRandomnessLookback) - r, err := p.m.api.ChainGetRandomness(ctx, p.ts.Key(), challengeRound) - if err != nil { - return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", p.ts.Height(), p.ppe, err) - } - p.r = r - - return nil -} - -func (p *post) sortedSectorInfo() sectorbuilder.SortedPublicSectorInfo { - panic("NYI") - sbsi := make([]ffi.PublicSectorInfo, len(p.sset)) - for k, sector := range p.sset { - var commR [sectorbuilder.CommLen]byte - copy(commR[:], sector.CommR) - - sbsi[k] = ffi.PublicSectorInfo{ - SectorID: sector.SectorID, - CommR: commR, - } - } - - return sectorbuilder.NewSortedPublicSectorInfo(sbsi) -} - -func (p *post) runPost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.runPost") - defer span.End() - - log.Infow("running PoSt", "delayed-by", - int64(p.ts.Height())-(int64(p.ppe)-int64(build.PoStChallangeTime)), - "chain-random", p.r, "ppe", p.ppe, "height", p.ts.Height(), "sectors", len(p.sset)) - - tsStart := time.Now() - var faults []uint64 // TODO - - var seed [32]byte - copy(seed[:], p.r) - - proof, err := p.m.sb.GenerateFallbackPoSt(p.sortedSectorInfo(), seed, faults) - if err != nil { - return xerrors.Errorf("running post failed: %w", err) - } - elapsed := time.Since(tsStart) - - p.proof = proof - log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed) - - return nil -} - -func (p *post) commitPost(ctx context.Context) (err error) { - ctx, span := trace.StartSpan(ctx, "storage.commitPost") - defer span.End() - - params := &actors.SubmitPoStParams{ - Proof: p.proof, - } - - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) - } - - msg := &types.Message{ - To: p.m.maddr, - From: p.m.worker, - Method: actors.MAMethods.SubmitPoSt, - Params: enc, - Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late - GasLimit: types.NewInt(1000000), // i dont know help - GasPrice: types.NewInt(1), - } - - log.Info("mpush") - - p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg) - if err != nil { - return xerrors.Errorf("pushing message to mpool: %w", err) - } - - return nil -} - -func (p *post) waitCommit(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.waitPost") - defer span.End() - - log.Infof("Waiting for post %s to appear on chain", p.smsg.Cid()) - - err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { - if rec.ExitCode != 0 { - log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode) - } - - log.Infof("Post made it on chain! (height=%d)", ts.Height()) - - return false, nil - }, func(ctx context.Context, ts *types.TipSet) error { - log.Warn("post message reverted") - return nil - }, 3, postMsgTimeout, p.smsg) - if err != nil { - return xerrors.Errorf("waiting for post to appear on chain: %w", err) - } - - return nil -} - -func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error { - called := 0 - return func(ctx context.Context, ts *types.TipSet, curH uint64) error { - called++ - if called > 1 { - log.Errorw("BUG: computePost callback called again", "ppe", ppe, - "height", ts.Height(), "curH", curH, "called", called-1) - return nil - } - - err := (&post{ - m: m, - ppe: ppe, - ts: ts, - }).doPost(ctx) - - m.scheduleNextPost(ppe + build.ProvingPeriodDuration) - - if err != nil { - return xerrors.Errorf("doPost: %w", err) - } - - return nil - } -} diff --git a/storage/sector_states.go b/storage/sector_states.go index ed5666285..455fa15b8 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -220,8 +220,6 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector return nil, xerrors.New("UNHANDLED: submitting sector proof failed") } - m.beginPosting(ctx) - return func(info *SectorInfo) { mcid := smsg.Cid() info.CommitMessage = &mcid