storageminer: New fallback post scheduler

This commit is contained in:
Łukasz Magiera 2019-11-28 18:44:49 +01:00
parent aefd432422
commit 920fd3ba9b
15 changed files with 308 additions and 329 deletions

View File

@ -99,7 +99,7 @@ type FullNode interface {
StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error)
StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error)

View File

@ -362,7 +362,7 @@ func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address
return c.Internal.StateMinerPeerID(ctx, m, ts)
}
func (c *FullNodeStruct) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
func (c *FullNodeStruct) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts)
}

View File

@ -75,11 +75,11 @@ const SectorChallengeRatioDiv = 25
const MaxFallbackPostChallengeCount = 10
// FallbackPoStBegin is the number of epochs the miner needs to wait after
// FallbackPoStDelay is the number of epochs the miner needs to wait after
// ElectionPeriodStart before starting fallback post computation
//
// Epochs
const FallbackPoStBegin = 1000
const FallbackPoStDelay = 1000
// SlashablePowerDelay is the number of epochs
// Epochs

View File

@ -389,12 +389,12 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC
return nil, err
}
type SubmitPoStParams struct {
type SubmitFallbackPoStParams struct {
Proof []byte
Candidates []types.EPostTicket
}
func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) {
func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitFallbackPoStParams) ([]byte, ActorError) {
oldstate, self, err := loadState(vmctx)
if err != nil {
return nil, err
@ -427,7 +427,7 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM
var seed [sectorbuilder.CommLen]byte
{
randHeight := self.ElectionPeriodStart + build.FallbackPoStBegin
randHeight := self.ElectionPeriodStart + build.FallbackPoStDelay
if vmctx.BlockHeight() <= randHeight {
// TODO: spec, retcode
return nil, aerrors.Newf(1, "submit fallback PoSt called too early (%d < %d)", vmctx.BlockHeight(), randHeight)

View File

@ -813,7 +813,7 @@ func (t *MinerInfo) UnmarshalCBOR(r io.Reader) error {
return nil
}
func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error {
func (t *SubmitFallbackPoStParams) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
@ -823,13 +823,10 @@ func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error {
}
// t.t.Proof (types.EPostProof) (struct)
if err := t.Proof.MarshalCBOR(w); err != nil {
return err
}
return nil
}
func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error {
func (t *SubmitFallbackPoStParams) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
@ -848,10 +845,6 @@ func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error {
{
if err := t.Proof.UnmarshalCBOR(br); err != nil {
return err
}
}
return nil
}

View File

@ -149,15 +149,14 @@ func GetMinerWorker(ctx context.Context, sm *StateManager, ts *types.TipSet, mad
return address.NewFromBytes(recp.Return)
}
func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) {
func GetMinerElectionPeriodStart(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return 0, xerrors.Errorf("failed to load miner actor state: %w", err)
}
panic("idk what to do")
//return mas.ProvingPeriodEnd, nil
return mas.ElectionPeriodStart, nil
}
func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.ChainSectorInfo, error) {

View File

@ -60,7 +60,7 @@ var infoCmd = &cli.Command{
}
fmt.Printf("Worker use: %d / %d (+%d)\n", wstat.Total-wstat.Reserved-wstat.Free, wstat.Total, wstat.Reserved)
ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil)
ppe, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil)
if err != nil {
return err
}

View File

@ -93,7 +93,7 @@ func main() {
actors.SectorPreCommitInfo{},
actors.PreCommittedSector{},
actors.MinerInfo{},
actors.SubmitPoStParams{},
actors.SubmitFallbackPoStParams{},
actors.PaymentVerifyParams{},
actors.UpdatePeerIDParams{},
actors.MultiSigActorState{},

View File

@ -328,10 +328,10 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (Sor
return NewSortedPrivateSectorInfo(out), nil
}
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo)
if err != nil {
return nil, err
return nil, nil, err
}
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())))
@ -339,10 +339,11 @@ func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo,
proverID := addressToProverID(sb.Miner)
candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, err
return nil, nil, err
}
return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
return candidates, proof, err
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector

View File

@ -76,8 +76,8 @@ func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts *
return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m)
}
func (a *StateAPI) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
return stmgr.GetMinerProvingPeriodEnd(ctx, a.StateManager, ts, actor)
func (a *StateAPI) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
return stmgr.GetMinerElectionPeriodStart(ctx, a.StateManager, ts, actor)
}
func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {

138
storage/fpost_run.go Normal file
View File

@ -0,0 +1,138 @@
package storage
import (
"context"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"time"
)
func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) {
ctx, abort := context.WithCancel(ctx)
s.abort = abort
s.activeEPS = eps
go func() {
defer abort()
ctx, span := trace.StartSpan(ctx, "fpostScheduler.doPost")
defer span.End()
proof, err := s.runPost(ctx, eps, ts)
if err != nil {
log.Errorf("runPost failed: %+v", err)
return
}
if err := s.submitPost(ctx, proof); err != nil {
log.Errorf("submitPost failed: %+v", err)
return
}
}()
}
func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()
challengeRound := int64(eps + build.FallbackPoStDelay)
rand, err := s.api.ChainGetRandomness(ctx, ts.Key(), challengeRound)
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness for fpost (ts=%d; eps=%d): %w", ts.Height(), eps, err)
}
ssi, err := s.sortedSectorInfo(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
log.Infow("running fPoSt", "chain-random", rand, "eps", eps, "height", ts.Height())
tsStart := time.Now()
var faults []uint64 // TODO
var seed [32]byte
copy(seed[:], rand)
scandidates, proof, err := s.sb.GenerateFallbackPoSt(ssi, seed, faults)
if err != nil {
return nil, xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
candidates := make([]types.EPostTicket, len(scandidates))
for i, sc := range scandidates {
candidates[i] = types.EPostTicket{
Partial: sc.PartialTicket[:],
SectorID: sc.SectorID,
ChallengeIndex: sc.SectorChallengeIndex,
}
}
return &actors.SubmitFallbackPoStParams{
Proof: proof,
Candidates: candidates,
}, nil
}
func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) {
sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts)
if err != nil {
return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err)
}
if len(sset) == 0 {
log.Warn("empty proving set! (ts.H: %d)", ts.Height())
}
sbsi := make([]ffi.PublicSectorInfo, len(sset))
for k, sector := range sset {
var commR [sectorbuilder.CommLen]byte
copy(commR[:], sector.CommR)
sbsi[k] = ffi.PublicSectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil
}
func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error {
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()
enc, aerr := actors.SerializeParams(proof)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
msg := &types.Message{
To: s.actor,
From: s.worker,
Method: actors.MAMethods.SubmitFallbackPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(10000000), // i dont know help
GasPrice: types.NewInt(1),
}
// TODO: consider maybe caring about the output
sm, err := s.api.MpoolPushMessage(ctx, msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Infof("Submitted fallback post: %s", sm.Cid())
return nil
}

141
storage/fpost_sched.go Normal file
View File

@ -0,0 +1,141 @@
package storage
import (
"context"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
const Inactive = 0
const StartConfidence = 4 // TODO: config
type fpostScheduler struct {
api storageMinerApi
sb *sectorbuilder.SectorBuilder
actor address.Address
worker address.Address
cur *types.TipSet
// if a post is in progress, this indicates for which ElectionPeriodStart
activeEPS uint64
abort context.CancelFunc
}
func (s *fpostScheduler) run(ctx context.Context) {
notifs, err := s.api.ChainNotify(ctx)
if err != nil {
return
}
current := <-notifs
if len(current) != 1 {
panic("expected first notif to have len = 1")
}
if current[0].Type == store.HCCurrent {
panic("expected first notif to tell current ts")
}
if err := s.update(ctx, current[0].Val); err != nil {
panic(err)
}
// not fine to panic after this point
for {
select {
case changes := <-notifs:
ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange")
var lowest, highest *types.TipSet = s.cur, nil
for _, change := range changes {
switch change.Type {
case store.HCRevert:
lowest = change.Val
case store.HCApply:
highest = change.Val
}
}
if err := s.revert(ctx, lowest); err != nil {
log.Error("handling head reverts in fallbackPost sched: %+v", err)
}
if err := s.update(ctx, highest); err != nil {
log.Error("handling head updates in fallbackPost sched: %+v", err)
}
span.End()
}
}
}
func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) error {
if s.cur == newLowest {
return nil
}
s.cur = newLowest
newEPS, _, err := s.shouldFallbackPost(ctx, newLowest)
if err != nil {
return err
}
if newEPS != s.activeEPS {
s.abortActivePoSt()
}
return nil
}
func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error {
newEPS, start, err := s.shouldFallbackPost(ctx, new)
if err != nil {
return err
}
if newEPS != s.activeEPS {
s.abortActivePoSt()
}
if newEPS != Inactive && start {
s.doPost(ctx, newEPS, new)
}
return nil
}
func (s *fpostScheduler) abortActivePoSt() {
if s.activeEPS == Inactive {
return // noop
}
if s.abort != nil {
s.abort()
}
log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS)
s.activeEPS = 0
s.abort = nil
}
func (s *fpostScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) {
eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts)
if err != nil {
return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err)
}
if ts.Height() >= eps+build.FallbackPoStDelay {
return eps, ts.Height() >= eps+build.FallbackPoStDelay+StartConfidence, nil
}
return 0, false, nil
}

View File

@ -53,7 +53,7 @@ type storageMinerApi interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
@ -99,7 +99,14 @@ func (m *Miner) Run(ctx context.Context) error {
m.events = events.NewEvents(ctx, m.api)
go m.beginPosting(ctx)
fps := &fpostScheduler{
api: m.api,
sb: m.sb,
actor: m.maddr,
worker: m.worker,
}
go fps.run(ctx)
go m.sectorStateLoop(ctx)
return nil
}

View File

@ -1,298 +0,0 @@
package storage
import (
"context"
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
const postMsgTimeout = 20
func (m *Miner) beginPosting(ctx context.Context) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
return
}
sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
if err != nil {
log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err)
return
}
if sppe == 0 {
log.Warn("Not proving yet")
return
}
m.postLk.Lock()
if m.schedPost > 0 {
log.Warnf("PoSts already running %d", m.schedPost)
m.postLk.Unlock()
return
}
// height needs to be +1, because otherwise we'd be trying to schedule PoSt
// at current block height
ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1)
m.schedPost = ppe
m.postLk.Unlock()
if build.PoStChallangeTime > ppe {
ppe = build.PoStChallangeTime
}
log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe)
err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoStChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) scheduleNextPost(ppe uint64) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
// TODO: retry
return
}
headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height())
if headPPE > ppe {
log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe)
ppe = headPPE
}
m.postLk.Lock()
if m.schedPost >= ppe {
// this probably can't happen
log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe)
m.postLk.Unlock()
return
}
m.schedPost = ppe
m.postLk.Unlock()
log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime,
"height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod)
err = m.events.ChainAt(m.computePost(ppe), func(ctx context.Context, ts *types.TipSet) error { // Revert
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoStChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %+v", err)
return
}
}
type post struct {
m *Miner
ppe uint64
ts *types.TipSet
// prep
sset []*api.ChainSectorInfo
r []byte
// run
proof []byte
// commit
smsg *types.SignedMessage
}
func (p *post) doPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.computePost")
defer span.End()
if err := p.preparePost(ctx); err != nil {
return xerrors.Errorf("prepare: %w", err)
}
if err := p.runPost(ctx); err != nil {
return xerrors.Errorf("run: %w", err)
}
if err := p.commitPost(ctx); err != nil {
return xerrors.Errorf("commit: %w", err)
}
if err := p.waitCommit(ctx); err != nil {
return xerrors.Errorf("wait: %w", err)
}
return nil
}
func (p *post) preparePost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
defer span.End()
log.Info("preparePost")
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
if err != nil {
return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err)
}
if len(sset) == 0 {
log.Warn("empty proving set! (ts.H: %d)", p.ts.Height())
}
p.sset = sset
challengeRound := int64(p.ppe) - int64(build.PoStChallangeTime+build.PoStRandomnessLookback)
r, err := p.m.api.ChainGetRandomness(ctx, p.ts.Key(), challengeRound)
if err != nil {
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", p.ts.Height(), p.ppe, err)
}
p.r = r
return nil
}
func (p *post) sortedSectorInfo() sectorbuilder.SortedPublicSectorInfo {
panic("NYI")
sbsi := make([]ffi.PublicSectorInfo, len(p.sset))
for k, sector := range p.sset {
var commR [sectorbuilder.CommLen]byte
copy(commR[:], sector.CommR)
sbsi[k] = ffi.PublicSectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
return sectorbuilder.NewSortedPublicSectorInfo(sbsi)
}
func (p *post) runPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()
log.Infow("running PoSt", "delayed-by",
int64(p.ts.Height())-(int64(p.ppe)-int64(build.PoStChallangeTime)),
"chain-random", p.r, "ppe", p.ppe, "height", p.ts.Height(), "sectors", len(p.sset))
tsStart := time.Now()
var faults []uint64 // TODO
var seed [32]byte
copy(seed[:], p.r)
proof, err := p.m.sb.GenerateFallbackPoSt(p.sortedSectorInfo(), seed, faults)
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
p.proof = proof
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
return nil
}
func (p *post) commitPost(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()
params := &actors.SubmitPoStParams{
Proof: p.proof,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
msg := &types.Message{
To: p.m.maddr,
From: p.m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(1000000), // i dont know help
GasPrice: types.NewInt(1),
}
log.Info("mpush")
p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
return nil
}
func (p *post) waitCommit(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.waitPost")
defer span.End()
log.Infof("Waiting for post %s to appear on chain", p.smsg.Cid())
err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
if rec.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode)
}
log.Infof("Post made it on chain! (height=%d)", ts.Height())
return false, nil
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("post message reverted")
return nil
}, 3, postMsgTimeout, p.smsg)
if err != nil {
return xerrors.Errorf("waiting for post to appear on chain: %w", err)
}
return nil
}
func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called := 0
return func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called++
if called > 1 {
log.Errorw("BUG: computePost callback called again", "ppe", ppe,
"height", ts.Height(), "curH", curH, "called", called-1)
return nil
}
err := (&post{
m: m,
ppe: ppe,
ts: ts,
}).doPost(ctx)
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
if err != nil {
return xerrors.Errorf("doPost: %w", err)
}
return nil
}
}

View File

@ -220,8 +220,6 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
return nil, xerrors.New("UNHANDLED: submitting sector proof failed")
}
m.beginPosting(ctx)
return func(info *SectorInfo) {
mcid := smsg.Cid()
info.CommitMessage = &mcid