storageminer: Update post scheduling
This commit is contained in:
parent
4305824be7
commit
1024812559
@ -313,8 +313,8 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
|
||||
}
|
||||
|
||||
provingPeriodOffset := self.ProvingPeriodEnd % build.ProvingPeriodDuration
|
||||
provingPeriod := (vmctx.BlockHeight() - provingPeriodOffset - 1) / build.ProvingPeriodDuration + 1
|
||||
currentProvingPeriodEnd := provingPeriod * build.ProvingPeriodDuration + provingPeriodOffset
|
||||
provingPeriod := (vmctx.BlockHeight()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1
|
||||
currentProvingPeriodEnd := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset
|
||||
|
||||
feesRequired := types.NewInt(0)
|
||||
|
||||
|
@ -26,8 +26,8 @@ var initCmd = &cli.Command{
|
||||
Usage: "specify the address of an already created miner actor",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "genesis-miner",
|
||||
Usage: "enable genesis mining (DON'T USE ON BOOTSTRAPPED NETWORK)",
|
||||
Name: "genesis-miner",
|
||||
Usage: "enable genesis mining (DON'T USE ON BOOTSTRAPPED NETWORK)",
|
||||
Hidden: true,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
@ -35,14 +35,14 @@ var initCmd = &cli.Command{
|
||||
Usage: "create separate worker key",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "worker",
|
||||
Name: "worker",
|
||||
Aliases: []string{"w"},
|
||||
Usage: "worker key to use (overrides --create-worker-key)",
|
||||
Usage: "worker key to use (overrides --create-worker-key)",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "owner",
|
||||
Name: "owner",
|
||||
Aliases: []string{"o"},
|
||||
Usage: "owner key to use",
|
||||
Usage: "owner key to use",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
|
177
storage/miner.go
177
storage/miner.go
@ -10,7 +10,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/xerrors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/api"
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
@ -44,7 +43,7 @@ type Miner struct {
|
||||
ds datastore.Batching
|
||||
|
||||
schedLk sync.Mutex
|
||||
postSched uint64
|
||||
schedPost uint64
|
||||
}
|
||||
|
||||
type storageMinerApi interface {
|
||||
@ -89,13 +88,8 @@ func (m *Miner) Run(ctx context.Context) error {
|
||||
|
||||
m.events = events.NewEvents(ctx, m.api)
|
||||
|
||||
ts, err := m.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go m.handlePostingSealedSectors(ctx)
|
||||
go m.schedulePoSt(ctx, ts, false)
|
||||
go m.beginPosting(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -173,14 +167,20 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
return
|
||||
}
|
||||
|
||||
m.schedulePoSt(ctx, nil, false)
|
||||
m.beginPosting(ctx)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bool) {
|
||||
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs)
|
||||
func (m *Miner) beginPosting(ctx context.Context) {
|
||||
ts, err := m.api.ChainHead(context.TODO())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get proving period end for miner: %s", err)
|
||||
return
|
||||
@ -188,25 +188,26 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bo
|
||||
|
||||
if ppe == 0 {
|
||||
log.Errorf("Proving period end == 0")
|
||||
// TODO: we probably want to call schedulePoSt after the first commitSector call
|
||||
return
|
||||
}
|
||||
|
||||
m.schedLk.Lock()
|
||||
|
||||
if m.postSched >= ppe {
|
||||
if !force || m.postSched > ppe {
|
||||
log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched)
|
||||
m.schedLk.Unlock()
|
||||
return
|
||||
}
|
||||
if m.schedPost >= 0 {
|
||||
log.Warnf("PoSts already running %d", m.schedPost)
|
||||
m.schedLk.Unlock()
|
||||
return
|
||||
}
|
||||
m.postSched = ppe
|
||||
|
||||
provingPeriodOffset := ppe % build.ProvingPeriodDuration
|
||||
provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1
|
||||
m.schedPost = provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset
|
||||
|
||||
m.schedLk.Unlock()
|
||||
|
||||
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
|
||||
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
|
||||
err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert
|
||||
// TODO: Cancel post
|
||||
log.Errorf("TODO: Cancel PoSt, re-run")
|
||||
return nil
|
||||
}, PoStConfidence, ppe-build.PoSTChallangeTime)
|
||||
if err != nil {
|
||||
@ -216,82 +217,66 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bo
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Miner) restartPost(ts *types.TipSet) {
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
log.Warn("Restarting PoSt after failure")
|
||||
m.schedulePoSt(context.TODO(), ts, true)
|
||||
func (m *Miner) scheduleNextPost(ppe uint64) {
|
||||
ts, err := m.api.ChainHead(context.TODO())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
// TODO: retry
|
||||
return
|
||||
}
|
||||
|
||||
provingPeriodOffset := ppe % build.ProvingPeriodDuration
|
||||
provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1
|
||||
headPPE := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset
|
||||
if headPPE > ppe {
|
||||
log.Warn("PoSt computation running behind chain")
|
||||
ppe = headPPE
|
||||
}
|
||||
|
||||
m.schedLk.Lock()
|
||||
if m.schedPost >= ppe {
|
||||
// this probably can't happen
|
||||
log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe)
|
||||
m.schedLk.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
m.schedPost = ppe
|
||||
m.schedLk.Unlock()
|
||||
|
||||
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
|
||||
err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert
|
||||
// TODO: Cancel post
|
||||
log.Errorf("TODO: Cancel PoSt, re-run")
|
||||
return nil
|
||||
}, PoStConfidence, ppe-build.PoSTChallangeTime)
|
||||
if err != nil {
|
||||
// TODO: This is BAD, figure something out
|
||||
log.Errorf("scheduling PoSt failed: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
|
||||
log.Info("starting PoSt computation")
|
||||
func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error {
|
||||
return func(ts *types.TipSet, curH uint64) error {
|
||||
|
||||
head, err := m.api.ChainHead(context.TODO())
|
||||
if err != nil {
|
||||
m.restartPost(ts)
|
||||
return err
|
||||
}
|
||||
ctx := context.TODO()
|
||||
|
||||
postWaitCh, _, err := m.maybeDoPost(context.TODO(), head)
|
||||
if err != nil {
|
||||
m.restartPost(ts)
|
||||
return err
|
||||
}
|
||||
|
||||
if postWaitCh == nil {
|
||||
log.Errorf("PoSt didn't start")
|
||||
m.restartPost(ts)
|
||||
return nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := <-postWaitCh
|
||||
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
|
||||
if err != nil {
|
||||
log.Errorf("got error back from postWaitCh: %s", err)
|
||||
m.restartPost(ts)
|
||||
return
|
||||
return xerrors.Errorf("failed to get proving set for miner: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("post successfully submitted")
|
||||
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get chain randomness for post: %w", err)
|
||||
}
|
||||
|
||||
m.schedulePoSt(context.TODO(), ts, false)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) {
|
||||
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to get proving period end for miner: %w", err)
|
||||
}
|
||||
|
||||
if ts.Height() > ppe {
|
||||
log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height())
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err)
|
||||
}
|
||||
|
||||
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err)
|
||||
}
|
||||
|
||||
sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.PoSTChallangeTime, ts)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err)
|
||||
}
|
||||
|
||||
ret := make(chan error, 1)
|
||||
go func() {
|
||||
log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height())
|
||||
var faults []uint64
|
||||
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
|
||||
if err != nil {
|
||||
ret <- xerrors.Errorf("running post failed: %w", err)
|
||||
return
|
||||
return xerrors.Errorf("running post failed: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("submitting PoSt pLen=%d", len(proof))
|
||||
@ -303,8 +288,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
|
||||
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err)
|
||||
return
|
||||
return xerrors.Errorf("could not serialize submit post parameters: %w", err)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
@ -319,21 +303,22 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
|
||||
|
||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
||||
if err != nil {
|
||||
ret <- xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
return
|
||||
return xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
// make sure it succeeds...
|
||||
_, err = m.api.ChainWaitMsg(ctx, smsg.Cid())
|
||||
rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if rec.Receipt.ExitCode != 0 {
|
||||
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
|
||||
// TODO: Do something
|
||||
}
|
||||
// TODO: check receipt
|
||||
|
||||
m.schedulePoSt(ctx, nil, true)
|
||||
}()
|
||||
|
||||
return ret, sourceTs.MinTicketBlock(), nil
|
||||
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func sectorIdList(si []*api.SectorInfo) []uint64 {
|
||||
|
Loading…
Reference in New Issue
Block a user