lotus/storage/post.go

298 lines
7.4 KiB
Go
Raw Normal View History

2019-09-26 23:07:40 +00:00
package storage
import (
"context"
"time"
2019-11-05 14:52:33 +00:00
"go.opencensus.io/trace"
2019-09-26 23:07:40 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
2019-11-08 20:11:56 +00:00
"github.com/filecoin-project/lotus/lib/sectorbuilder"
2019-09-26 23:07:40 +00:00
)
2019-11-19 21:27:25 +00:00
const postMsgTimeout = 20
2019-09-26 23:07:40 +00:00
func (m *Miner) beginPosting(ctx context.Context) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
return
}
2019-11-24 16:35:50 +00:00
sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
2019-09-26 23:07:40 +00:00
if err != nil {
2019-11-24 16:35:50 +00:00
log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err)
2019-09-26 23:07:40 +00:00
return
}
2019-11-24 16:35:50 +00:00
if sppe == 0 {
2019-11-22 16:15:33 +00:00
log.Warn("Not proving yet")
2019-09-26 23:07:40 +00:00
return
}
2019-11-01 13:58:48 +00:00
m.postLk.Lock()
2019-09-26 23:07:40 +00:00
if m.schedPost > 0 {
log.Warnf("PoSts already running %d", m.schedPost)
2019-11-01 13:58:48 +00:00
m.postLk.Unlock()
2019-09-26 23:07:40 +00:00
return
}
// height needs to be +1, because otherwise we'd be trying to schedule PoSt
// at current block height
2019-11-24 16:35:50 +00:00
ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1)
2019-09-30 22:56:23 +00:00
m.schedPost = ppe
2019-09-26 23:07:40 +00:00
2019-11-01 13:58:48 +00:00
m.postLk.Unlock()
2019-09-26 23:07:40 +00:00
2019-11-24 16:35:50 +00:00
if build.PoStChallangeTime > ppe {
ppe = build.PoStChallangeTime
}
log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe)
2019-11-05 14:03:59 +00:00
err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
2019-09-26 23:07:40 +00:00
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoStChallangeTime)
2019-09-26 23:07:40 +00:00
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) scheduleNextPost(ppe uint64) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
// TODO: retry
return
}
2019-09-27 19:47:47 +00:00
headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height())
2019-09-26 23:07:40 +00:00
if headPPE > ppe {
log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe)
2019-09-26 23:07:40 +00:00
ppe = headPPE
}
2019-11-01 13:58:48 +00:00
m.postLk.Lock()
2019-09-26 23:07:40 +00:00
if m.schedPost >= ppe {
// this probably can't happen
log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe)
2019-11-01 13:58:48 +00:00
m.postLk.Unlock()
2019-09-26 23:07:40 +00:00
return
}
m.schedPost = ppe
2019-11-01 13:58:48 +00:00
m.postLk.Unlock()
2019-09-26 23:07:40 +00:00
log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime,
"height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod)
2019-11-05 14:03:59 +00:00
err = m.events.ChainAt(m.computePost(ppe), func(ctx context.Context, ts *types.TipSet) error { // Revert
2019-09-26 23:07:40 +00:00
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoStChallangeTime)
2019-09-26 23:07:40 +00:00
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %+v", err)
2019-09-26 23:07:40 +00:00
return
}
}
2019-11-05 14:52:33 +00:00
type post struct {
m *Miner
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
ppe uint64
ts *types.TipSet
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
// prep
2019-11-08 18:15:13 +00:00
sset []*api.ChainSectorInfo
2019-11-05 14:52:33 +00:00
r []byte
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
// run
proof []byte
2019-11-05 14:52:33 +00:00
// commit
2019-11-24 16:35:50 +00:00
smsg *types.SignedMessage
2019-11-05 14:52:33 +00:00
}
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
func (p *post) doPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.computePost")
defer span.End()
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
if err := p.preparePost(ctx); err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("prepare: %w", err)
2019-11-05 14:52:33 +00:00
}
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
if err := p.runPost(ctx); err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("run: %w", err)
2019-11-05 14:52:33 +00:00
}
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
if err := p.commitPost(ctx); err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("commit: %w", err)
2019-11-05 14:52:33 +00:00
}
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
if err := p.waitCommit(ctx); err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("wait: %w", err)
2019-11-05 14:52:33 +00:00
}
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
return nil
}
func (p *post) preparePost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
defer span.End()
log.Info("preparePost")
2019-11-05 14:52:33 +00:00
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
if err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err)
2019-11-05 14:52:33 +00:00
}
if len(sset) == 0 {
log.Warn("empty proving set! (ts.H: %d)", p.ts.Height())
}
2019-11-05 14:52:33 +00:00
p.sset = sset
2019-09-26 23:07:40 +00:00
// Compute how many blocks back we have to look from the given tipset for the PoSt challenge
challengeLookback := int((int64(p.ts.Height()) - int64(p.ppe)) + int64(build.PoStChallangeTime) + int64(build.PoStRandomnessLookback))
r, err := p.m.api.ChainGetRandomness(ctx, p.ts.Key(), nil, challengeLookback)
2019-11-05 14:52:33 +00:00
if err != nil {
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", p.ts.Height(), p.ppe, err)
}
p.r = r
2019-09-26 23:07:40 +00:00
2019-11-05 14:52:33 +00:00
return nil
}
2019-11-07 18:22:59 +00:00
func (p *post) sortedSectorInfo() sectorbuilder.SortedSectorInfo {
sbsi := make([]sectorbuilder.SectorInfo, len(p.sset))
for k, sector := range p.sset {
var commR [sectorbuilder.CommLen]byte
copy(commR[:], sector.CommR)
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
return sectorbuilder.NewSortedSectorInfo(sbsi)
}
2019-11-05 14:52:33 +00:00
func (p *post) runPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()
log.Infow("running PoSt", "delayed-by",
int64(p.ts.Height())-(int64(p.ppe)-int64(build.PoStChallangeTime)),
"chain-random", p.r, "ppe", p.ppe, "height", p.ts.Height(), "sectors", len(p.sset))
2019-11-05 14:52:33 +00:00
tsStart := time.Now()
var faults []uint64 // TODO
2019-11-07 18:22:59 +00:00
var seed [32]byte
copy(seed[:], p.r)
proof, err := p.m.sb.GeneratePoSt(p.sortedSectorInfo(), seed, faults)
2019-11-05 14:52:33 +00:00
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
p.proof = proof
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
return nil
}
2019-11-24 16:35:50 +00:00
func (p *post) commitPost(ctx context.Context) (err error) {
2019-11-05 14:52:33 +00:00
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()
params := &actors.SubmitPoStParams{
Proof: p.proof,
DoneSet: types.BitFieldFromSet(nil),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
2019-11-24 16:35:50 +00:00
msg := &types.Message{
2019-11-05 14:52:33 +00:00
To: p.m.maddr,
From: p.m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("mpush")
2019-11-24 16:35:50 +00:00
p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg)
2019-11-05 14:52:33 +00:00
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
return nil
}
func (p *post) waitCommit(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.waitPost")
defer span.End()
log.Infof("Waiting for post %s to appear on chain", p.smsg.Cid())
2019-11-05 14:52:33 +00:00
2019-11-19 21:27:25 +00:00
err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
if rec.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode)
}
log.Infof("Post made it on chain! (height=%d)", ts.Height())
return false, nil
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("post message reverted")
return nil
2019-11-24 16:35:50 +00:00
}, 3, postMsgTimeout, p.smsg)
2019-11-05 14:52:33 +00:00
if err != nil {
2019-11-24 16:35:50 +00:00
return xerrors.Errorf("waiting for post to appear on chain: %w", err)
2019-11-05 14:52:33 +00:00
}
2019-11-19 21:27:25 +00:00
2019-11-05 14:52:33 +00:00
return nil
}
func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called := 0
return func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called++
if called > 1 {
log.Errorw("BUG: computePost callback called again", "ppe", ppe,
"height", ts.Height(), "curH", curH, "called", called-1)
return nil
}
err := (&post{
m: m,
ppe: ppe,
ts: ts,
}).doPost(ctx)
2019-11-24 16:35:50 +00:00
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
2019-09-26 23:07:40 +00:00
if err != nil {
return xerrors.Errorf("doPost: %w", err)
2019-09-26 23:07:40 +00:00
}
return nil
}
}