storageminer: tracing in computePost

This commit is contained in:
Łukasz Magiera 2019-11-05 15:52:33 +01:00
parent 4c4c2095a6
commit daabe470a4

View File

@ -2,6 +2,8 @@ package storage
import (
"context"
"github.com/ipfs/go-cid"
"go.opencensus.io/trace"
"time"
"golang.org/x/xerrors"
@ -96,6 +98,140 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
}
}
type post struct {
m *Miner
ppe uint64
ts *types.TipSet
// prep
sset []*api.SectorInfo
r []byte
// run
proof []byte
// commit
smsg cid.Cid
}
func (p *post) doPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.computePost")
defer span.End()
if err := p.preparePost(ctx); err != nil {
return err
}
if err := p.runPost(ctx); err != nil {
return err
}
if err := p.commitPost(ctx); err != nil {
return err
}
if err := p.waitCommit(ctx); err != nil {
return err
}
return nil
}
func (p *post) preparePost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
defer span.End()
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
if err != nil {
return xerrors.Errorf("failed to get proving set for miner: %w", err)
}
p.sset = sset
r, err := p.m.api.ChainGetRandomness(ctx, p.ts, nil, int(int64(p.ts.Height())-int64(p.ppe)+int64(build.PoStChallangeTime)+int64(build.PoStRandomnessLookback))) // TODO: review: check math
if err != nil {
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", p.ts.Height(), p.ppe, err)
}
p.r = r
return nil
}
func (p *post) runPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End()
log.Infow("running PoSt", "delayed-by",
int64(p.ts.Height())-(int64(p.ppe)-int64(build.PoStChallangeTime)),
"chain-random", p.r, "ppe", p.ppe, "height", p.ts.Height())
tsStart := time.Now()
var faults []uint64 // TODO
proof, err := p.m.secst.RunPoSt(ctx, p.sset, p.r, faults)
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
p.proof = proof
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
return nil
}
func (p *post) commitPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End()
params := &actors.SubmitPoStParams{
Proof: p.proof,
DoneSet: types.BitFieldFromSet(nil),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
msg := &types.Message{
To: p.m.maddr,
From: p.m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("mpush")
smsg, err := p.m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
p.smsg = smsg.Cid()
return nil
}
func (p *post) waitCommit(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.waitPost")
defer span.End()
log.Infof("Waiting for post %s to appear on chain", p.smsg)
// make sure it succeeds...
rec, err := p.m.api.StateWaitMsg(ctx, p.smsg)
if err != nil {
return err
}
if rec.Receipt.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
// TODO: Do something
}
return nil
}
func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called := 0
return func(ctx context.Context, ts *types.TipSet, curH uint64) error {
@ -106,68 +242,14 @@ func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipS
return nil
}
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
if err != nil {
return xerrors.Errorf("failed to get proving set for miner: %w", err)
}
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoStChallangeTime)+int64(build.PoStRandomnessLookback))) // TODO: review: check math
if err != nil {
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", ts.Height(), ppe, err)
}
log.Infow("running PoSt", "delayed-by",
int64(ts.Height())-(int64(ppe)-int64(build.PoStChallangeTime)),
"chain-random", r, "ppe", ppe, "height", ts.Height())
tsStart := time.Now()
var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
params := &actors.SubmitPoStParams{
Proof: proof,
DoneSet: types.BitFieldFromSet(nil),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", err)
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("mpush")
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Infof("Waiting for post %s to appear on chain", smsg.Cid())
// make sure it succeeds...
rec, err := m.api.StateWaitMsg(ctx, smsg.Cid())
err := (&post{
m: m,
ppe: ppe,
ts: ts,
}).doPost(ctx)
if err != nil {
return err
}
if rec.Receipt.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
// TODO: Do something
}
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
return nil