lpwindow: wip chain sub, more cleanups

This commit is contained in:
Łukasz Magiera 2023-10-25 16:00:49 +02:00
parent 4fad50ad9a
commit 4efb34e290
7 changed files with 196 additions and 77 deletions

View File

@ -0,0 +1,181 @@
package lpwindow
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/wdpost"
)
type changeHandler struct {
api WDPoStAPI
actor address.Address
proveHdlr *proveHandler
}
func newChangeHandler(api WDPoStAPI, actor address.Address) *changeHandler {
p := newProver(api)
return &changeHandler{api: api, actor: actor, proveHdlr: p}
}
func (ch *changeHandler) start() {
go ch.proveHdlr.run()
}
func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
// Get the current deadline period
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
if err != nil {
return err
}
if !di.PeriodStarted() {
return nil // not proving anything yet
}
hc := &headChange{
ctx: ctx,
revert: revert,
advance: advance,
di: di,
}
select {
case ch.proveHdlr.hcs <- hc:
case <-ch.proveHdlr.shutdownCtx.Done():
case <-ctx.Done():
}
select {
case ch.submitHdlr.hcs <- hc:
case <-ch.submitHdlr.shutdownCtx.Done():
case <-ctx.Done():
}
return nil
}
type proveHandler struct {
api WdPoStCommands
posts *postsCache
postResults chan *postResult
hcs chan *headChange
current *currentPost
shutdownCtx context.Context
shutdown context.CancelFunc
}
type headChange struct {
ctx context.Context
revert *types.TipSet
advance *types.TipSet
di *dline.Info
}
type currentPost struct {
di *dline.Info
abort context.CancelFunc
}
func newProver(
api WDPoStAPI,
) *proveHandler {
ctx, cancel := context.WithCancel(context.Background())
return &proveHandler{
api: api,
postResults: make(chan *postResult),
hcs: make(chan *headChange),
shutdownCtx: ctx,
shutdown: cancel,
}
}
func (p *proveHandler) run() {
// Abort proving on shutdown
defer func() {
if p.current != nil {
p.current.abort()
}
}()
for p.shutdownCtx.Err() == nil {
select {
case <-p.shutdownCtx.Done():
return
case hc := <-p.hcs:
// Head changed
p.processHeadChange(hc.ctx, hc.advance, hc.di)
case res := <-p.postResults:
// Proof generation complete
p.processPostResult(res)
}
}
}
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
// If the post window has expired, abort the current proof
if p.current != nil && newTS.Height() >= p.current.di.Close {
// Cancel the context on the current proof
p.current.abort()
// Clear out the reference to the proof so that we can immediately
// start generating a new proof, without having to worry about state
// getting clobbered when the abort completes
p.current = nil
}
// Only generate one proof at a time
if p.current != nil {
return
}
// If the proof for the current post window has been generated, check the
// next post window
_, complete := p.posts.get(di)
for complete {
di = wdpost.NextDeadline(di)
_, complete = p.posts.get(di)
}
// Check if the chain is above the Challenge height for the post window
if newTS.Height() < di.Challenge+wdpost.ChallengeConfidence {
return
}
p.current = &currentPost{di: di}
curr := p.current
p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
})
}
func (p *proveHandler) processPostResult(res *postResult) {
di := res.currPost.di
if res.err != nil {
// Proving failed so inform the API
p.api.recordPoStFailure(res.err, res.ts, di)
log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
p.api.onAbort(res.ts, di)
// Check if the current post has already been aborted
if p.current == res.currPost {
// If the current post was not already aborted, setting it to nil
// marks it as complete so that a new post can be started
p.current = nil
}
return
}
// Completed processing this proving window
p.current = nil
// Add the proofs to the cache
p.posts.add(di, res.posts)
}

View File

@ -2,6 +2,7 @@ package lpwindow
import ( import (
"context" "context"
"github.com/filecoin-project/go-address"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"sort" "sort"
"time" "time"
@ -28,12 +29,12 @@ type WdPostTaskDetails struct {
type WDPoStAPI interface { type WDPoStAPI interface {
ChainHead(context.Context) (*types.TipSet, error) ChainHead(context.Context) (*types.TipSet, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
} }
type WdPostTask struct { type WdPostTask struct {
api WDPoStAPI api WDPoStAPI
tasks chan *WdPostTaskDetails
db *harmonydb.DB db *harmonydb.DB
max int max int
} }
@ -250,25 +251,12 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask { func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask {
return &WdPostTask{ return &WdPostTask{
tasks: make(chan *WdPostTaskDetails, 2),
db: db, db: db,
api: api, api: api,
max: max, max: max,
} }
} }
func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error {
t.tasks <- &WdPostTaskDetails{
Ts: ts,
Deadline: deadline,
}
//log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks)
return nil
}
func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
tsKey := ts.Key() tsKey := ts.Key()
@ -314,46 +302,4 @@ func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId
return true, nil return true, nil
} }
func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID) error {
tsKey := ts.Key()
_, err := t.db.Exec(ctx,
`INSERT INTO wdpost_tasks (
task_id,
tskey,
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId,
tsKey.Bytes(),
deadline.CurrentEpoch,
deadline.PeriodStart,
deadline.Index,
deadline.Open,
deadline.Close,
deadline.Challenge,
deadline.FaultCutoff,
deadline.WPoStPeriodDeadlines,
deadline.WPoStProvingPeriod,
deadline.WPoStChallengeWindow,
deadline.WPoStChallengeLookback,
deadline.FaultDeclarationCutoff,
)
if err != nil {
return err
}
return nil
}
var _ harmonytask.TaskInterface = &WdPostTask{} var _ harmonytask.TaskInterface = &WdPostTask{}

View File

@ -1,7 +1,6 @@
package lpwindow package lpwindow
import ( import (
"context"
"testing" "testing"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
@ -27,7 +26,6 @@ func TestAddTask(t *testing.T) {
_ = taskEngine _ = taskEngine
ts := types.TipSet{} ts := types.TipSet{}
deadline := dline.Info{} deadline := dline.Info{}
err = wdPostTask.AddTask(context.Background(), &ts, &deadline)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -230,7 +230,7 @@ func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSe
// next post window // next post window
_, complete := p.posts.get(di) _, complete := p.posts.get(di)
for complete { for complete {
di = nextDeadline(di) di = NextDeadline(di)
_, complete = p.posts.get(di) _, complete = p.posts.get(di)
} }
@ -525,8 +525,8 @@ func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
return <-out return <-out
} }
// nextDeadline gets deadline info for the subsequent deadline // NextDeadline gets deadline info for the subsequent deadline
func nextDeadline(currentDeadline *dline.Info) *dline.Info { func NextDeadline(currentDeadline *dline.Info) *dline.Info {
periodStart := currentDeadline.PeriodStart periodStart := currentDeadline.PeriodStart
newDeadline := currentDeadline.Index + 1 newDeadline := currentDeadline.Index + 1
if newDeadline == miner.WPoStPeriodDeadlines { if newDeadline == miner.WPoStPeriodDeadlines {

View File

@ -441,7 +441,7 @@ func TestChangeHandlerStartProvingNextDeadline(t *testing.T) {
// Trigger head change that advances the chain to the Challenge epoch for // Trigger head change that advances the chain to the Challenge epoch for
// the next deadline // the next deadline
go func() { go func() {
di = nextDeadline(di) di = NextDeadline(di)
currentEpoch = di.Challenge + ChallengeConfidence currentEpoch = di.Challenge + ChallengeConfidence
triggerHeadAdvance(t, s, currentEpoch) triggerHeadAdvance(t, s, currentEpoch)
}() }()
@ -474,7 +474,7 @@ func TestChangeHandlerProvingRounds(t *testing.T) {
<-s.ch.proveHdlr.processedHeadChanges <-s.ch.proveHdlr.processedHeadChanges
completeProofEpoch := di.Open + completeProofIndex completeProofEpoch := di.Open + completeProofIndex
next := nextDeadline(di) next := NextDeadline(di)
//fmt.Println("epoch", currentEpoch, s.mock.getPostStatus(di), "next", s.mock.getPostStatus(next)) //fmt.Println("epoch", currentEpoch, s.mock.getPostStatus(di), "next", s.mock.getPostStatus(next))
if currentEpoch >= next.Challenge { if currentEpoch >= next.Challenge {
require.Equal(t, postStatusComplete, s.mock.getPostStatus(di)) require.Equal(t, postStatusComplete, s.mock.getPostStatus(di))
@ -962,7 +962,7 @@ func TestChangeHandlerSubmitRevertTwoEpochs(t *testing.T) {
require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1))
// Move to the challenge epoch for the next deadline // Move to the challenge epoch for the next deadline
diE2 := nextDeadline(diE1) diE2 := NextDeadline(diE1)
currentEpoch = diE2.Challenge + ChallengeConfidence currentEpoch = diE2.Challenge + ChallengeConfidence
go triggerHeadAdvance(t, s, currentEpoch) go triggerHeadAdvance(t, s, currentEpoch)
@ -1067,7 +1067,7 @@ func TestChangeHandlerSubmitRevertAdvanceLess(t *testing.T) {
require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1)) require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1))
// Move to the challenge epoch for the next deadline // Move to the challenge epoch for the next deadline
diE2 := nextDeadline(diE1) diE2 := NextDeadline(diE1)
currentEpoch = diE2.Challenge + ChallengeConfidence currentEpoch = diE2.Challenge + ChallengeConfidence
go triggerHeadAdvance(t, s, currentEpoch) go triggerHeadAdvance(t, s, currentEpoch)

View File

@ -24,7 +24,7 @@ func TestNextDeadline(t *testing.T) {
for i := 1; i < 1+int(minertypes.WPoStPeriodDeadlines)*2; i++ { for i := 1; i < 1+int(minertypes.WPoStPeriodDeadlines)*2; i++ {
//stm: @WDPOST_NEXT_DEADLINE_001 //stm: @WDPOST_NEXT_DEADLINE_001
di = nextDeadline(di) di = NextDeadline(di)
deadlineIdx = i % int(minertypes.WPoStPeriodDeadlines) deadlineIdx = i % int(minertypes.WPoStPeriodDeadlines)
expPeriodStart := int(minertypes.WPoStProvingPeriod) * (i / int(minertypes.WPoStPeriodDeadlines)) expPeriodStart := int(minertypes.WPoStProvingPeriod) * (i / int(minertypes.WPoStPeriodDeadlines))
expOpen := expPeriodStart + deadlineIdx*int(minertypes.WPoStChallengeWindow) expOpen := expPeriodStart + deadlineIdx*int(minertypes.WPoStChallengeWindow)

View File

@ -145,7 +145,6 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
*WindowPoStScheduler *WindowPoStScheduler
}{s.api, s} }{s.api, s}
// Initialize change handler.
s.ch = newChangeHandler(callbacks, s.actor) s.ch = newChangeHandler(callbacks, s.actor)
defer s.ch.shutdown() defer s.ch.shutdown()
s.ch.start() s.ch.start()
@ -233,11 +232,6 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
if err != nil { if err != nil {
log.Errorf("handling head updates in window post sched: %+v", err) log.Errorf("handling head updates in window post sched: %+v", err)
} }
//err = s.ch2.update(ctx, revert, apply)
//if err != nil {
// log.Errorf("handling head updates in window post sched: %+v", err)
//}
} }
// onAbort is called when generating proofs or submitting proofs is aborted // onAbort is called when generating proofs or submitting proofs is aborted