Merge pull request #3924 from filecoin-project/feat/dont-recompute-PoST-redux
Dont recompute post on revert <<redux>>
This commit is contained in:
commit
7a3a2f8db9
@ -20,6 +20,10 @@ import (
|
||||
|
||||
// Unchanged between v0 and v1 actors
|
||||
var WPoStProvingPeriod = miner0.WPoStProvingPeriod
|
||||
var WPoStPeriodDeadlines = miner0.WPoStPeriodDeadlines
|
||||
var WPoStChallengeWindow = miner0.WPoStChallengeWindow
|
||||
var WPoStChallengeLookback = miner0.WPoStChallengeLookback
|
||||
var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff
|
||||
|
||||
const MinSectorExpiration = miner0.MinSectorExpiration
|
||||
|
||||
|
537
storage/wdpost_changehandler.go
Normal file
537
storage/wdpost_changehandler.go
Normal file
@ -0,0 +1,537 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
const SubmitConfidence = 4
|
||||
|
||||
type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
|
||||
type CompleteSubmitPoSTCb func(err error)
|
||||
|
||||
type changeHandlerAPI interface {
|
||||
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
|
||||
startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
|
||||
startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
|
||||
onAbort(ts *types.TipSet, deadline *dline.Info)
|
||||
failPost(err error, ts *types.TipSet, deadline *dline.Info)
|
||||
}
|
||||
|
||||
type changeHandler struct {
|
||||
api changeHandlerAPI
|
||||
actor address.Address
|
||||
proveHdlr *proveHandler
|
||||
submitHdlr *submitHandler
|
||||
}
|
||||
|
||||
func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler {
|
||||
posts := newPostsCache()
|
||||
p := newProver(api, posts)
|
||||
s := newSubmitter(api, posts)
|
||||
return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s}
|
||||
}
|
||||
|
||||
func (ch *changeHandler) start() {
|
||||
go ch.proveHdlr.run()
|
||||
go ch.submitHdlr.run()
|
||||
}
|
||||
|
||||
func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
|
||||
// Get the current deadline period
|
||||
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !di.PeriodStarted() {
|
||||
return nil // not proving anything yet
|
||||
}
|
||||
|
||||
hc := &headChange{
|
||||
ctx: ctx,
|
||||
revert: revert,
|
||||
advance: advance,
|
||||
di: di,
|
||||
}
|
||||
|
||||
select {
|
||||
case ch.proveHdlr.hcs <- hc:
|
||||
case <-ch.proveHdlr.shutdownCtx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
select {
|
||||
case ch.submitHdlr.hcs <- hc:
|
||||
case <-ch.submitHdlr.shutdownCtx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *changeHandler) shutdown() {
|
||||
ch.proveHdlr.shutdown()
|
||||
ch.submitHdlr.shutdown()
|
||||
}
|
||||
|
||||
func (ch *changeHandler) currentTSDI() (*types.TipSet, *dline.Info) {
|
||||
return ch.submitHdlr.currentTSDI()
|
||||
}
|
||||
|
||||
// postsCache keeps a cache of PoSTs for each proving window
|
||||
type postsCache struct {
|
||||
added chan *postInfo
|
||||
lk sync.RWMutex
|
||||
cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams
|
||||
}
|
||||
|
||||
func newPostsCache() *postsCache {
|
||||
return &postsCache{
|
||||
added: make(chan *postInfo, 16),
|
||||
cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) {
|
||||
c.lk.Lock()
|
||||
defer c.lk.Unlock()
|
||||
|
||||
// TODO: clear cache entries older than chain finality
|
||||
c.cache[di.Open] = posts
|
||||
|
||||
c.added <- &postInfo{
|
||||
di: di,
|
||||
posts: posts,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) {
|
||||
c.lk.RLock()
|
||||
defer c.lk.RUnlock()
|
||||
|
||||
posts, ok := c.cache[di.Open]
|
||||
return posts, ok
|
||||
}
|
||||
|
||||
type headChange struct {
|
||||
ctx context.Context
|
||||
revert *types.TipSet
|
||||
advance *types.TipSet
|
||||
di *dline.Info
|
||||
}
|
||||
|
||||
type currentPost struct {
|
||||
di *dline.Info
|
||||
abort context.CancelFunc
|
||||
}
|
||||
|
||||
type postResult struct {
|
||||
ts *types.TipSet
|
||||
currPost *currentPost
|
||||
posts []miner.SubmitWindowedPoStParams
|
||||
err error
|
||||
}
|
||||
|
||||
// proveHandler generates proofs
|
||||
type proveHandler struct {
|
||||
api changeHandlerAPI
|
||||
posts *postsCache
|
||||
|
||||
postResults chan *postResult
|
||||
hcs chan *headChange
|
||||
|
||||
current *currentPost
|
||||
|
||||
shutdownCtx context.Context
|
||||
shutdown context.CancelFunc
|
||||
|
||||
// Used for testing
|
||||
processedHeadChanges chan *headChange
|
||||
processedPostResults chan *postResult
|
||||
}
|
||||
|
||||
func newProver(
|
||||
api changeHandlerAPI,
|
||||
posts *postsCache,
|
||||
) *proveHandler {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &proveHandler{
|
||||
api: api,
|
||||
posts: posts,
|
||||
postResults: make(chan *postResult),
|
||||
hcs: make(chan *headChange),
|
||||
shutdownCtx: ctx,
|
||||
shutdown: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proveHandler) run() {
|
||||
// Abort proving on shutdown
|
||||
defer func() {
|
||||
if p.current != nil {
|
||||
p.current.abort()
|
||||
}
|
||||
}()
|
||||
|
||||
for p.shutdownCtx.Err() == nil {
|
||||
select {
|
||||
case <-p.shutdownCtx.Done():
|
||||
return
|
||||
|
||||
case hc := <-p.hcs:
|
||||
// Head changed
|
||||
p.processHeadChange(hc.ctx, hc.advance, hc.di)
|
||||
if p.processedHeadChanges != nil {
|
||||
p.processedHeadChanges <- hc
|
||||
}
|
||||
|
||||
case res := <-p.postResults:
|
||||
// Proof generation complete
|
||||
p.processPostResult(res)
|
||||
if p.processedPostResults != nil {
|
||||
p.processedPostResults <- res
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
||||
// If the post window has expired, abort the current proof
|
||||
if p.current != nil && newTS.Height() >= p.current.di.Close {
|
||||
// Cancel the context on the current proof
|
||||
p.current.abort()
|
||||
|
||||
// Clear out the reference to the proof so that we can immediately
|
||||
// start generating a new proof, without having to worry about state
|
||||
// getting clobbered when the abort completes
|
||||
p.current = nil
|
||||
}
|
||||
|
||||
// Only generate one proof at a time
|
||||
if p.current != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If the proof for the current post window has been generated, check the
|
||||
// next post window
|
||||
_, complete := p.posts.get(di)
|
||||
for complete {
|
||||
di = nextDeadline(di)
|
||||
_, complete = p.posts.get(di)
|
||||
}
|
||||
|
||||
// Check if the chain is above the Challenge height for the post window
|
||||
if newTS.Height() < di.Challenge {
|
||||
return
|
||||
}
|
||||
|
||||
p.current = ¤tPost{di: di}
|
||||
curr := p.current
|
||||
p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
|
||||
p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *proveHandler) processPostResult(res *postResult) {
|
||||
di := res.currPost.di
|
||||
if res.err != nil {
|
||||
// Proving failed so inform the API
|
||||
p.api.failPost(res.err, res.ts, di)
|
||||
log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
|
||||
p.api.onAbort(res.ts, di)
|
||||
|
||||
// Check if the current post has already been aborted
|
||||
if p.current == res.currPost {
|
||||
// If the current post was not already aborted, setting it to nil
|
||||
// marks it as complete so that a new post can be started
|
||||
p.current = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Completed processing this proving window
|
||||
p.current = nil
|
||||
|
||||
// Add the proofs to the cache
|
||||
p.posts.add(di, res.posts)
|
||||
}
|
||||
|
||||
type submitResult struct {
|
||||
pw *postWindow
|
||||
err error
|
||||
}
|
||||
|
||||
type SubmitState string
|
||||
|
||||
const (
|
||||
SubmitStateStart SubmitState = "SubmitStateStart"
|
||||
SubmitStateSubmitting SubmitState = "SubmitStateSubmitting"
|
||||
SubmitStateComplete SubmitState = "SubmitStateComplete"
|
||||
)
|
||||
|
||||
type postWindow struct {
|
||||
ts *types.TipSet
|
||||
di *dline.Info
|
||||
submitState SubmitState
|
||||
abort context.CancelFunc
|
||||
}
|
||||
|
||||
type postInfo struct {
|
||||
di *dline.Info
|
||||
posts []miner.SubmitWindowedPoStParams
|
||||
}
|
||||
|
||||
// submitHandler submits proofs on-chain
|
||||
type submitHandler struct {
|
||||
api changeHandlerAPI
|
||||
posts *postsCache
|
||||
|
||||
submitResults chan *submitResult
|
||||
hcs chan *headChange
|
||||
|
||||
postWindows map[abi.ChainEpoch]*postWindow
|
||||
getPostWindowReqs chan *getPWReq
|
||||
|
||||
shutdownCtx context.Context
|
||||
shutdown context.CancelFunc
|
||||
|
||||
currentCtx context.Context
|
||||
currentTS *types.TipSet
|
||||
currentDI *dline.Info
|
||||
getTSDIReq chan chan *tsdi
|
||||
|
||||
// Used for testing
|
||||
processedHeadChanges chan *headChange
|
||||
processedSubmitResults chan *submitResult
|
||||
processedPostReady chan *postInfo
|
||||
}
|
||||
|
||||
func newSubmitter(
|
||||
api changeHandlerAPI,
|
||||
posts *postsCache,
|
||||
) *submitHandler {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &submitHandler{
|
||||
api: api,
|
||||
posts: posts,
|
||||
submitResults: make(chan *submitResult),
|
||||
hcs: make(chan *headChange),
|
||||
postWindows: make(map[abi.ChainEpoch]*postWindow),
|
||||
getPostWindowReqs: make(chan *getPWReq),
|
||||
getTSDIReq: make(chan chan *tsdi),
|
||||
shutdownCtx: ctx,
|
||||
shutdown: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *submitHandler) run() {
|
||||
// On shutdown, abort in-progress submits
|
||||
defer func() {
|
||||
for _, pw := range s.postWindows {
|
||||
if pw.abort != nil {
|
||||
pw.abort()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for s.shutdownCtx.Err() == nil {
|
||||
select {
|
||||
case <-s.shutdownCtx.Done():
|
||||
return
|
||||
|
||||
case hc := <-s.hcs:
|
||||
// Head change
|
||||
s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di)
|
||||
if s.processedHeadChanges != nil {
|
||||
s.processedHeadChanges <- hc
|
||||
}
|
||||
|
||||
case pi := <-s.posts.added:
|
||||
// Proof generated
|
||||
s.processPostReady(pi)
|
||||
if s.processedPostReady != nil {
|
||||
s.processedPostReady <- pi
|
||||
}
|
||||
|
||||
case res := <-s.submitResults:
|
||||
// Submit complete
|
||||
s.processSubmitResult(res)
|
||||
if s.processedSubmitResults != nil {
|
||||
s.processedSubmitResults <- res
|
||||
}
|
||||
|
||||
case pwreq := <-s.getPostWindowReqs:
|
||||
// used by getPostWindow() to sync with run loop
|
||||
pwreq.out <- s.postWindows[pwreq.di.Open]
|
||||
|
||||
case out := <-s.getTSDIReq:
|
||||
// used by currentTSDI() to sync with run loop
|
||||
out <- &tsdi{ts: s.currentTS, di: s.currentDI}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processHeadChange is called when the chain head changes
|
||||
func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
|
||||
s.currentCtx = ctx
|
||||
s.currentTS = advance
|
||||
s.currentDI = di
|
||||
|
||||
// Start tracking the current post window if we're not already
|
||||
// TODO: clear post windows older than chain finality
|
||||
if _, ok := s.postWindows[di.Open]; !ok {
|
||||
s.postWindows[di.Open] = &postWindow{
|
||||
di: di,
|
||||
ts: advance,
|
||||
submitState: SubmitStateStart,
|
||||
}
|
||||
}
|
||||
|
||||
// Apply the change to all post windows
|
||||
for _, pw := range s.postWindows {
|
||||
s.processHeadChangeForPW(ctx, revert, advance, pw)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) {
|
||||
revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open
|
||||
expired := advance.Height() >= pw.di.Close
|
||||
|
||||
// If the chain was reverted back to the previous deadline, or if the post
|
||||
// window has expired, abort submit
|
||||
if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) {
|
||||
// Replace the aborted postWindow with a new one so that we can
|
||||
// submit again at any time without the state getting clobbered
|
||||
// when the abort completes
|
||||
abort := pw.abort
|
||||
if abort != nil {
|
||||
pw = &postWindow{
|
||||
di: pw.di,
|
||||
ts: advance,
|
||||
submitState: SubmitStateStart,
|
||||
}
|
||||
s.postWindows[pw.di.Open] = pw
|
||||
|
||||
// Abort the current submit
|
||||
abort()
|
||||
}
|
||||
} else if pw.submitState == SubmitStateComplete && revertedToPrevDL {
|
||||
// If submit for this deadline has completed, but the chain was
|
||||
// reverted back to the previous deadline, reset the submit state to the
|
||||
// starting state, so that it can be resubmitted
|
||||
pw.submitState = SubmitStateStart
|
||||
}
|
||||
|
||||
// Submit the proof to chain if the proof has been generated and the chain
|
||||
// height is above confidence
|
||||
s.submitIfReady(ctx, advance, pw)
|
||||
}
|
||||
|
||||
// processPostReady is called when a proof generation completes
|
||||
func (s *submitHandler) processPostReady(pi *postInfo) {
|
||||
pw, ok := s.postWindows[pi.di.Open]
|
||||
if ok {
|
||||
s.submitIfReady(s.currentCtx, s.currentTS, pw)
|
||||
}
|
||||
}
|
||||
|
||||
// submitIfReady submits a proof if the chain is high enough and the proof
|
||||
// has been generated for this deadline
|
||||
func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) {
|
||||
// If the window has expired, there's nothing more to do.
|
||||
if advance.Height() >= pw.di.Close {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we're already submitting, or already completed submit
|
||||
if pw.submitState != SubmitStateStart {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we've reached the confidence height to submit
|
||||
if advance.Height() < pw.di.Open+SubmitConfidence {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the proofs have been generated for this deadline
|
||||
posts, ok := s.posts.get(pw.di)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// If there was nothing to prove, move straight to the complete state
|
||||
if len(posts) == 0 {
|
||||
pw.submitState = SubmitStateComplete
|
||||
return
|
||||
}
|
||||
|
||||
// Start submitting post
|
||||
pw.submitState = SubmitStateSubmitting
|
||||
pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) {
|
||||
s.submitResults <- &submitResult{pw: pw, err: err}
|
||||
})
|
||||
}
|
||||
|
||||
// processSubmitResult is called with the response to a submit
|
||||
func (s *submitHandler) processSubmitResult(res *submitResult) {
|
||||
if res.err != nil {
|
||||
// Submit failed so inform the API and go back to the start state
|
||||
s.api.failPost(res.err, res.pw.ts, res.pw.di)
|
||||
log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
|
||||
s.api.onAbort(res.pw.ts, res.pw.di)
|
||||
|
||||
res.pw.submitState = SubmitStateStart
|
||||
return
|
||||
}
|
||||
|
||||
// Submit succeeded so move to complete state
|
||||
res.pw.submitState = SubmitStateComplete
|
||||
}
|
||||
|
||||
type tsdi struct {
|
||||
ts *types.TipSet
|
||||
di *dline.Info
|
||||
}
|
||||
|
||||
func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) {
|
||||
out := make(chan *tsdi)
|
||||
s.getTSDIReq <- out
|
||||
res := <-out
|
||||
return res.ts, res.di
|
||||
}
|
||||
|
||||
type getPWReq struct {
|
||||
di *dline.Info
|
||||
out chan *postWindow
|
||||
}
|
||||
|
||||
func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
|
||||
out := make(chan *postWindow)
|
||||
s.getPostWindowReqs <- &getPWReq{di: di, out: out}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// nextDeadline gets deadline info for the subsequent deadline
|
||||
func nextDeadline(currentDeadline *dline.Info) *dline.Info {
|
||||
periodStart := currentDeadline.PeriodStart
|
||||
newDeadline := currentDeadline.Index + 1
|
||||
if newDeadline == miner.WPoStPeriodDeadlines {
|
||||
newDeadline = 0
|
||||
periodStart = periodStart + miner.WPoStProvingPeriod
|
||||
}
|
||||
|
||||
return NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch)
|
||||
}
|
||||
|
||||
func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
|
||||
return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod, miner.WPoStChallengeWindow, miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
|
||||
}
|
1173
storage/wdpost_changehandler_test.go
Normal file
1173
storage/wdpost_changehandler_test.go
Normal file
File diff suppressed because it is too large
Load Diff
38
storage/wdpost_nextdl_test.go
Normal file
38
storage/wdpost_nextdl_test.go
Normal file
@ -0,0 +1,38 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
)
|
||||
|
||||
func TestNextDeadline(t *testing.T) {
|
||||
periodStart := abi.ChainEpoch(0)
|
||||
deadlineIdx := 0
|
||||
currentEpoch := abi.ChainEpoch(10)
|
||||
|
||||
di := NewDeadlineInfo(periodStart, uint64(deadlineIdx), currentEpoch)
|
||||
require.EqualValues(t, 0, di.Index)
|
||||
require.EqualValues(t, 0, di.PeriodStart)
|
||||
require.EqualValues(t, -20, di.Challenge)
|
||||
require.EqualValues(t, 0, di.Open)
|
||||
require.EqualValues(t, 60, di.Close)
|
||||
|
||||
for i := 1; i < 1+int(miner.WPoStPeriodDeadlines)*2; i++ {
|
||||
di = nextDeadline(di)
|
||||
deadlineIdx = i % int(miner.WPoStPeriodDeadlines)
|
||||
expPeriodStart := int(miner.WPoStProvingPeriod) * (i / int(miner.WPoStPeriodDeadlines))
|
||||
expOpen := expPeriodStart + deadlineIdx*int(miner.WPoStChallengeWindow)
|
||||
expClose := expOpen + int(miner.WPoStChallengeWindow)
|
||||
expChallenge := expOpen - int(miner.WPoStChallengeLookback)
|
||||
//fmt.Printf("%d: %d@%d %d-%d (%d)\n", i, expPeriodStart, deadlineIdx, expOpen, expClose, expChallenge)
|
||||
require.EqualValues(t, deadlineIdx, di.Index)
|
||||
require.EqualValues(t, expPeriodStart, di.PeriodStart)
|
||||
require.EqualValues(t, expOpen, di.Open)
|
||||
require.EqualValues(t, expClose, di.Close)
|
||||
require.EqualValues(t, expChallenge, di.Challenge)
|
||||
}
|
||||
}
|
@ -29,15 +29,21 @@ import (
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
)
|
||||
|
||||
func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
|
||||
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
c := evtCommon{Error: err}
|
||||
if ts != nil {
|
||||
c.Deadline = deadline
|
||||
c.Height = ts.Height()
|
||||
c.TipSet = ts.Cids()
|
||||
}
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
evtCommon: c,
|
||||
State: SchedulerStateFaulted,
|
||||
}
|
||||
})
|
||||
|
||||
log.Errorf("TODO")
|
||||
log.Errorf("Got err %w - TODO handle errors", err)
|
||||
/*s.failLk.Lock()
|
||||
if eps > s.failed {
|
||||
s.failed = eps
|
||||
@ -45,11 +51,28 @@ func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
|
||||
s.failLk.Unlock()*/
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info, ts *types.TipSet) {
|
||||
ctx, abort := context.WithCancel(ctx)
|
||||
// recordProofsEvent records a successful proofs_processed event in the
|
||||
// journal, even if it was a noop (no partitions).
|
||||
func (s *WindowPoStScheduler) recordProofsEvent(partitions []miner.PoStPartition, mcid cid.Cid) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
|
||||
return &WdPoStProofsProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
Partitions: partitions,
|
||||
MessageCID: mcid,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
s.abort = abort
|
||||
s.activeDeadline = deadline
|
||||
// startGeneratePoST kicks off the process of generating a PoST
|
||||
func (s *WindowPoStScheduler) startGeneratePoST(
|
||||
ctx context.Context,
|
||||
ts *types.TipSet,
|
||||
deadline *dline.Info,
|
||||
completeGeneratePoST CompleteGeneratePoSTCb,
|
||||
) context.CancelFunc {
|
||||
ctx, abort := context.WithCancel(ctx)
|
||||
go func() {
|
||||
defer abort()
|
||||
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
@ -58,54 +81,104 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info,
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer abort()
|
||||
posts, err := s.runGeneratePoST(ctx, ts, deadline)
|
||||
completeGeneratePoST(posts, err)
|
||||
}()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost")
|
||||
return abort
|
||||
}
|
||||
|
||||
// runGeneratePoST generates the PoST
|
||||
func (s *WindowPoStScheduler) runGeneratePoST(
|
||||
ctx context.Context,
|
||||
ts *types.TipSet,
|
||||
deadline *dline.Info,
|
||||
) ([]miner.SubmitWindowedPoStParams, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.generatePoST")
|
||||
defer span.End()
|
||||
|
||||
// recordProofsEvent records a successful proofs_processed event in the
|
||||
// journal, even if it was a noop (no partitions).
|
||||
recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
|
||||
return &WdPoStProofsProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
Partitions: partitions,
|
||||
MessageCID: mcid,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
posts, err := s.runPost(ctx, *deadline, ts)
|
||||
if err != nil {
|
||||
log.Errorf("run window post failed: %+v", err)
|
||||
s.failPost(err, deadline)
|
||||
return
|
||||
log.Errorf("runPost failed: %+v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(posts) == 0 {
|
||||
recordProofsEvent(nil, cid.Undef)
|
||||
return
|
||||
s.recordProofsEvent(nil, cid.Undef)
|
||||
}
|
||||
|
||||
for i := range posts {
|
||||
post := &posts[i]
|
||||
sm, err := s.submitPost(ctx, post)
|
||||
if err != nil {
|
||||
log.Errorf("submit window post failed: %+v", err)
|
||||
s.failPost(err, deadline)
|
||||
} else {
|
||||
recordProofsEvent(post.Partitions, sm.Cid())
|
||||
}
|
||||
}
|
||||
return posts, nil
|
||||
}
|
||||
|
||||
// startSubmitPoST kicks of the process of submitting PoST
|
||||
func (s *WindowPoStScheduler) startSubmitPoST(
|
||||
ctx context.Context,
|
||||
ts *types.TipSet,
|
||||
deadline *dline.Info,
|
||||
posts []miner.SubmitWindowedPoStParams,
|
||||
completeSubmitPoST CompleteSubmitPoSTCb,
|
||||
) context.CancelFunc {
|
||||
|
||||
ctx, abort := context.WithCancel(ctx)
|
||||
go func() {
|
||||
defer abort()
|
||||
|
||||
err := s.runSubmitPoST(ctx, ts, deadline, posts)
|
||||
if err == nil {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
State: SchedulerStateSucceeded,
|
||||
}
|
||||
})
|
||||
}
|
||||
completeSubmitPoST(err)
|
||||
}()
|
||||
|
||||
return abort
|
||||
}
|
||||
|
||||
// runSubmitPoST submits PoST
|
||||
func (s *WindowPoStScheduler) runSubmitPoST(
|
||||
ctx context.Context,
|
||||
ts *types.TipSet,
|
||||
deadline *dline.Info,
|
||||
posts []miner.SubmitWindowedPoStParams,
|
||||
) error {
|
||||
if len(posts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.submitPoST")
|
||||
defer span.End()
|
||||
|
||||
// Get randomness from tickets
|
||||
commEpoch := deadline.Open
|
||||
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
|
||||
log.Errorf("submitPost failed: %+v", err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
var submitErr error
|
||||
for i := range posts {
|
||||
// Add randomness to PoST
|
||||
post := &posts[i]
|
||||
post.ChainCommitEpoch = commEpoch
|
||||
post.ChainCommitRand = commRand
|
||||
|
||||
// Submit PoST
|
||||
sm, submitErr := s.submitPost(ctx, post)
|
||||
if submitErr != nil {
|
||||
log.Errorf("submit window post failed: %+v", submitErr)
|
||||
} else {
|
||||
s.recordProofsEvent(post.Partitions, sm.Cid())
|
||||
}
|
||||
}
|
||||
|
||||
return submitErr
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.BitField) (bitfield.BitField, error) {
|
||||
@ -396,7 +469,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
||||
|
||||
rand, err := s.api.ChainGetRandomnessFromBeacon(ctx, ts.Key(), crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
|
||||
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
|
||||
}
|
||||
|
||||
// Get the partitions for the given deadline
|
||||
@ -540,19 +613,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
||||
posts = append(posts, params)
|
||||
}
|
||||
|
||||
// Compute randomness after generating proofs so as to reduce the impact
|
||||
// of chain reorgs (which change randomness)
|
||||
commEpoch := di.Open
|
||||
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
|
||||
}
|
||||
|
||||
for i := range posts {
|
||||
posts[i].ChainCommitEpoch = commEpoch
|
||||
posts[i].ChainCommitRand = commRand
|
||||
}
|
||||
|
||||
return posts, nil
|
||||
}
|
||||
|
||||
@ -593,6 +653,7 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]a
|
||||
}
|
||||
batches = append(batches, partitions[i:end])
|
||||
}
|
||||
|
||||
return batches, nil
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
@ -177,7 +178,10 @@ func TestWDPostDoPost(t *testing.T) {
|
||||
FaultDeclarationCutoff: miner0.FaultDeclarationCutoff,
|
||||
}
|
||||
ts := mockTipSet(t)
|
||||
scheduler.doPost(ctx, di, ts)
|
||||
|
||||
scheduler.startGeneratePoST(ctx, ts, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
|
||||
scheduler.startSubmitPoST(ctx, ts, di, posts, func(err error) {})
|
||||
})
|
||||
|
||||
// Read the window PoST messages
|
||||
for i := 0; i < expectedMsgCount; i++ {
|
||||
|
@ -22,8 +22,6 @@ import (
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
const StartConfidence = 4 // TODO: config
|
||||
|
||||
type WindowPoStScheduler struct {
|
||||
api storageMinerApi
|
||||
feeCfg config.MinerFeeConfig
|
||||
@ -31,16 +29,11 @@ type WindowPoStScheduler struct {
|
||||
faultTracker sectorstorage.FaultTracker
|
||||
proofType abi.RegisteredPoStProof
|
||||
partitionSectors uint64
|
||||
ch *changeHandler
|
||||
|
||||
actor address.Address
|
||||
worker address.Address
|
||||
|
||||
cur *types.TipSet
|
||||
|
||||
// if a post is in progress, this indicates for which ElectionPeriodStart
|
||||
activeDeadline *dline.Info
|
||||
abort context.CancelFunc
|
||||
|
||||
evtTypes [4]journal.EventType
|
||||
|
||||
// failed abi.ChainEpoch // eps
|
||||
@ -77,16 +70,17 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
|
||||
}, nil
|
||||
}
|
||||
|
||||
func deadlineEquals(a, b *dline.Info) bool {
|
||||
if a == nil || b == nil {
|
||||
return b == a
|
||||
}
|
||||
|
||||
return a.PeriodStart == b.PeriodStart && a.Index == b.Index && a.Challenge == b.Challenge
|
||||
type changeHandlerAPIImpl struct {
|
||||
storageMinerApi
|
||||
*WindowPoStScheduler
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
defer s.abortActivePoSt()
|
||||
// Initialize change handler
|
||||
chImpl := &changeHandlerAPIImpl{storageMinerApi: s.api, WindowPoStScheduler: s}
|
||||
s.ch = newChangeHandler(chImpl, s.actor)
|
||||
defer s.ch.shutdown()
|
||||
s.ch.start()
|
||||
|
||||
var notifs <-chan []*api.HeadChange
|
||||
var err error
|
||||
@ -125,9 +119,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.update(ctx, chg.Val); err != nil {
|
||||
log.Errorf("%+v", err)
|
||||
}
|
||||
s.update(ctx, nil, chg.Val)
|
||||
|
||||
gotCur = true
|
||||
continue
|
||||
@ -135,7 +127,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange")
|
||||
|
||||
var lowest, highest *types.TipSet = s.cur, nil
|
||||
var lowest, highest *types.TipSet = nil, nil
|
||||
|
||||
for _, change := range changes {
|
||||
if change.Val == nil {
|
||||
@ -149,12 +141,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.revert(ctx, lowest); err != nil {
|
||||
log.Error("handling head reverts in window post sched: %+v", err)
|
||||
}
|
||||
if err := s.update(ctx, highest); err != nil {
|
||||
log.Error("handling head updates in window post sched: %+v", err)
|
||||
}
|
||||
s.update(ctx, lowest, highest)
|
||||
|
||||
span.End()
|
||||
case <-ctx.Done():
|
||||
@ -163,95 +150,40 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) revert(ctx context.Context, newLowest *types.TipSet) error {
|
||||
if s.cur == newLowest {
|
||||
return nil
|
||||
func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) {
|
||||
if apply == nil {
|
||||
log.Error("no new tipset in window post WindowPoStScheduler.update")
|
||||
return
|
||||
}
|
||||
s.cur = newLowest
|
||||
|
||||
newDeadline, err := s.api.StateMinerProvingDeadline(ctx, s.actor, newLowest.Key())
|
||||
err := s.ch.update(ctx, revert, apply)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Errorf("handling head updates in window post sched: %+v", err)
|
||||
}
|
||||
|
||||
if !deadlineEquals(s.activeDeadline, newDeadline) {
|
||||
s.abortActivePoSt()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) update(ctx context.Context, new *types.TipSet) error {
|
||||
if new == nil {
|
||||
return xerrors.Errorf("no new tipset in window post sched update")
|
||||
}
|
||||
|
||||
di, err := s.api.StateMinerProvingDeadline(ctx, s.actor, new.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if deadlineEquals(s.activeDeadline, di) {
|
||||
return nil // already working on this deadline
|
||||
}
|
||||
|
||||
if !di.PeriodStarted() {
|
||||
return nil // not proving anything yet
|
||||
}
|
||||
|
||||
s.abortActivePoSt()
|
||||
|
||||
// TODO: wait for di.Challenge here, will give us ~10min more to compute windowpost
|
||||
// (Need to get correct deadline above, which is tricky)
|
||||
|
||||
if di.Open+StartConfidence >= new.Height() {
|
||||
log.Info("not starting window post yet, waiting for startconfidence", di.Open, di.Open+StartConfidence, new.Height())
|
||||
return nil
|
||||
}
|
||||
|
||||
/*s.failLk.Lock()
|
||||
if s.failed > 0 {
|
||||
s.failed = 0
|
||||
s.activeEPS = 0
|
||||
}
|
||||
s.failLk.Unlock()*/
|
||||
log.Infof("at %d, do window post for P %d, dd %d", new.Height(), di.PeriodStart, di.Index)
|
||||
|
||||
s.doPost(ctx, di, new)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) abortActivePoSt() {
|
||||
if s.activeDeadline == nil {
|
||||
return // noop
|
||||
}
|
||||
|
||||
if s.abort != nil {
|
||||
s.abort()
|
||||
|
||||
// onAbort is called when generating proofs or submitting proofs is aborted
|
||||
func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
c := evtCommon{}
|
||||
if ts != nil {
|
||||
c.Deadline = deadline
|
||||
c.Height = ts.Height()
|
||||
c.TipSet = ts.Cids()
|
||||
}
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
evtCommon: c,
|
||||
State: SchedulerStateAborted,
|
||||
}
|
||||
})
|
||||
|
||||
log.Warnf("Aborting window post (Deadline: %+v)", s.activeDeadline)
|
||||
}
|
||||
|
||||
s.activeDeadline = nil
|
||||
s.abort = nil
|
||||
}
|
||||
|
||||
// getEvtCommon populates and returns common attributes from state, for a
|
||||
// WdPoSt journal event.
|
||||
func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon {
|
||||
c := evtCommon{Error: err}
|
||||
if s.cur != nil {
|
||||
c.Deadline = s.activeDeadline
|
||||
c.Height = s.cur.Height()
|
||||
c.TipSet = s.cur.Cids()
|
||||
currentTS, currentDeadline := s.ch.currentTSDI()
|
||||
if currentTS != nil {
|
||||
c.Deadline = currentDeadline
|
||||
c.Height = currentTS.Height()
|
||||
c.TipSet = currentTS.Cids()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user