diff --git a/provider/lpwindow/chain_sub.go b/provider/lpwindow/chain_sub.go new file mode 100644 index 000000000..e477fa9f9 --- /dev/null +++ b/provider/lpwindow/chain_sub.go @@ -0,0 +1,181 @@ +package lpwindow + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/wdpost" +) + +type changeHandler struct { + api WDPoStAPI + actor address.Address + proveHdlr *proveHandler +} + +func newChangeHandler(api WDPoStAPI, actor address.Address) *changeHandler { + p := newProver(api) + return &changeHandler{api: api, actor: actor, proveHdlr: p} +} + +func (ch *changeHandler) start() { + go ch.proveHdlr.run() +} + +func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { + // Get the current deadline period + di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) + if err != nil { + return err + } + + if !di.PeriodStarted() { + return nil // not proving anything yet + } + + hc := &headChange{ + ctx: ctx, + revert: revert, + advance: advance, + di: di, + } + + select { + case ch.proveHdlr.hcs <- hc: + case <-ch.proveHdlr.shutdownCtx.Done(): + case <-ctx.Done(): + } + + select { + case ch.submitHdlr.hcs <- hc: + case <-ch.submitHdlr.shutdownCtx.Done(): + case <-ctx.Done(): + } + + return nil +} + +type proveHandler struct { + api WdPoStCommands + posts *postsCache + + postResults chan *postResult + hcs chan *headChange + + current *currentPost + + shutdownCtx context.Context + shutdown context.CancelFunc +} + +type headChange struct { + ctx context.Context + revert *types.TipSet + advance *types.TipSet + di *dline.Info +} + +type currentPost struct { + di *dline.Info + abort context.CancelFunc +} + +func newProver( + api WDPoStAPI, +) *proveHandler { + ctx, cancel := context.WithCancel(context.Background()) + return &proveHandler{ + api: api, + postResults: make(chan *postResult), + hcs: make(chan *headChange), + shutdownCtx: ctx, + shutdown: cancel, + } +} + +func (p *proveHandler) run() { + // Abort proving on shutdown + defer func() { + if p.current != nil { + p.current.abort() + } + }() + + for p.shutdownCtx.Err() == nil { + select { + case <-p.shutdownCtx.Done(): + return + + case hc := <-p.hcs: + // Head changed + p.processHeadChange(hc.ctx, hc.advance, hc.di) + + case res := <-p.postResults: + // Proof generation complete + p.processPostResult(res) + } + } +} + +func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { + // If the post window has expired, abort the current proof + if p.current != nil && newTS.Height() >= p.current.di.Close { + // Cancel the context on the current proof + p.current.abort() + + // Clear out the reference to the proof so that we can immediately + // start generating a new proof, without having to worry about state + // getting clobbered when the abort completes + p.current = nil + } + + // Only generate one proof at a time + if p.current != nil { + return + } + + // If the proof for the current post window has been generated, check the + // next post window + _, complete := p.posts.get(di) + for complete { + di = wdpost.NextDeadline(di) + _, complete = p.posts.get(di) + } + + // Check if the chain is above the Challenge height for the post window + if newTS.Height() < di.Challenge+wdpost.ChallengeConfidence { + return + } + + p.current = ¤tPost{di: di} + curr := p.current + p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) { + p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err} + }) +} + +func (p *proveHandler) processPostResult(res *postResult) { + di := res.currPost.di + if res.err != nil { + // Proving failed so inform the API + p.api.recordPoStFailure(res.err, res.ts, di) + log.Warnf("Aborted window post Proving (Deadline: %+v)", di) + p.api.onAbort(res.ts, di) + + // Check if the current post has already been aborted + if p.current == res.currPost { + // If the current post was not already aborted, setting it to nil + // marks it as complete so that a new post can be started + p.current = nil + } + return + } + + // Completed processing this proving window + p.current = nil + + // Add the proofs to the cache + p.posts.add(di, res.posts) +} diff --git a/provider/lpwindow/wdpost_task.go b/provider/lpwindow/task.go similarity index 82% rename from provider/lpwindow/wdpost_task.go rename to provider/lpwindow/task.go index fd4ed7470..7d3a9cff0 100644 --- a/provider/lpwindow/wdpost_task.go +++ b/provider/lpwindow/task.go @@ -2,6 +2,7 @@ package lpwindow import ( "context" + "github.com/filecoin-project/go-address" logging "github.com/ipfs/go-log/v2" "sort" "time" @@ -28,14 +29,14 @@ type WdPostTaskDetails struct { type WDPoStAPI interface { ChainHead(context.Context) (*types.TipSet, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) + StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) } type WdPostTask struct { api WDPoStAPI - tasks chan *WdPostTaskDetails - db *harmonydb.DB - max int + db *harmonydb.DB + max int } func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { @@ -250,25 +251,12 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask { return &WdPostTask{ - tasks: make(chan *WdPostTaskDetails, 2), - db: db, - api: api, - max: max, + db: db, + api: api, + max: max, } } -func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error { - - t.tasks <- &WdPostTaskDetails{ - Ts: ts, - Deadline: deadline, - } - - //log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks) - - return nil -} - func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { tsKey := ts.Key() @@ -314,46 +302,4 @@ func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId return true, nil } -func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID) error { - - tsKey := ts.Key() - _, err := t.db.Exec(ctx, - `INSERT INTO wdpost_tasks ( - task_id, - tskey, - current_epoch, - period_start, - index, - open, - close, - challenge, - fault_cutoff, - wpost_period_deadlines, - wpost_proving_period, - wpost_challenge_window, - wpost_challenge_lookback, - fault_declaration_cutoff - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, - taskId, - tsKey.Bytes(), - deadline.CurrentEpoch, - deadline.PeriodStart, - deadline.Index, - deadline.Open, - deadline.Close, - deadline.Challenge, - deadline.FaultCutoff, - deadline.WPoStPeriodDeadlines, - deadline.WPoStProvingPeriod, - deadline.WPoStChallengeWindow, - deadline.WPoStChallengeLookback, - deadline.FaultDeclarationCutoff, - ) - if err != nil { - return err - } - - return nil -} - var _ harmonytask.TaskInterface = &WdPostTask{} diff --git a/provider/lpwindow/wdpost_task_test.go b/provider/lpwindow/task_test.go similarity index 92% rename from provider/lpwindow/wdpost_task_test.go rename to provider/lpwindow/task_test.go index 0728a2c68..c9a65dbc9 100644 --- a/provider/lpwindow/wdpost_task_test.go +++ b/provider/lpwindow/task_test.go @@ -1,7 +1,6 @@ package lpwindow import ( - "context" "testing" "github.com/filecoin-project/go-state-types/dline" @@ -27,7 +26,6 @@ func TestAddTask(t *testing.T) { _ = taskEngine ts := types.TipSet{} deadline := dline.Info{} - err = wdPostTask.AddTask(context.Background(), &ts, &deadline) require.NoError(t, err) } diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index 3f71f5d4b..1d4432459 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -230,7 +230,7 @@ func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSe // next post window _, complete := p.posts.get(di) for complete { - di = nextDeadline(di) + di = NextDeadline(di) _, complete = p.posts.get(di) } @@ -525,8 +525,8 @@ func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow { return <-out } -// nextDeadline gets deadline info for the subsequent deadline -func nextDeadline(currentDeadline *dline.Info) *dline.Info { +// NextDeadline gets deadline info for the subsequent deadline +func NextDeadline(currentDeadline *dline.Info) *dline.Info { periodStart := currentDeadline.PeriodStart newDeadline := currentDeadline.Index + 1 if newDeadline == miner.WPoStPeriodDeadlines { diff --git a/storage/wdpost/wdpost_changehandler_test.go b/storage/wdpost/wdpost_changehandler_test.go index dac6c4558..44d0dfe6d 100644 --- a/storage/wdpost/wdpost_changehandler_test.go +++ b/storage/wdpost/wdpost_changehandler_test.go @@ -441,7 +441,7 @@ func TestChangeHandlerStartProvingNextDeadline(t *testing.T) { // Trigger head change that advances the chain to the Challenge epoch for // the next deadline go func() { - di = nextDeadline(di) + di = NextDeadline(di) currentEpoch = di.Challenge + ChallengeConfidence triggerHeadAdvance(t, s, currentEpoch) }() @@ -474,7 +474,7 @@ func TestChangeHandlerProvingRounds(t *testing.T) { <-s.ch.proveHdlr.processedHeadChanges completeProofEpoch := di.Open + completeProofIndex - next := nextDeadline(di) + next := NextDeadline(di) //fmt.Println("epoch", currentEpoch, s.mock.getPostStatus(di), "next", s.mock.getPostStatus(next)) if currentEpoch >= next.Challenge { require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) @@ -962,7 +962,7 @@ func TestChangeHandlerSubmitRevertTwoEpochs(t *testing.T) { require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) // Move to the challenge epoch for the next deadline - diE2 := nextDeadline(diE1) + diE2 := NextDeadline(diE1) currentEpoch = diE2.Challenge + ChallengeConfidence go triggerHeadAdvance(t, s, currentEpoch) @@ -1067,7 +1067,7 @@ func TestChangeHandlerSubmitRevertAdvanceLess(t *testing.T) { require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) // Move to the challenge epoch for the next deadline - diE2 := nextDeadline(diE1) + diE2 := NextDeadline(diE1) currentEpoch = diE2.Challenge + ChallengeConfidence go triggerHeadAdvance(t, s, currentEpoch) diff --git a/storage/wdpost/wdpost_nextdl_test.go b/storage/wdpost/wdpost_nextdl_test.go index d591c1e88..ef140de30 100644 --- a/storage/wdpost/wdpost_nextdl_test.go +++ b/storage/wdpost/wdpost_nextdl_test.go @@ -24,7 +24,7 @@ func TestNextDeadline(t *testing.T) { for i := 1; i < 1+int(minertypes.WPoStPeriodDeadlines)*2; i++ { //stm: @WDPOST_NEXT_DEADLINE_001 - di = nextDeadline(di) + di = NextDeadline(di) deadlineIdx = i % int(minertypes.WPoStPeriodDeadlines) expPeriodStart := int(minertypes.WPoStProvingPeriod) * (i / int(minertypes.WPoStPeriodDeadlines)) expOpen := expPeriodStart + deadlineIdx*int(minertypes.WPoStChallengeWindow) diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index 6bf0aeb5c..ce2124c43 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -145,7 +145,6 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { *WindowPoStScheduler }{s.api, s} - // Initialize change handler. s.ch = newChangeHandler(callbacks, s.actor) defer s.ch.shutdown() s.ch.start() @@ -233,11 +232,6 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T if err != nil { log.Errorf("handling head updates in window post sched: %+v", err) } - - //err = s.ch2.update(ctx, revert, apply) - //if err != nil { - // log.Errorf("handling head updates in window post sched: %+v", err) - //} } // onAbort is called when generating proofs or submitting proofs is aborted