From f01096bca32694dc48bf26cf3bb8dc01d0980405 Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Thu, 5 Oct 2023 11:52:22 -0400 Subject: [PATCH] wip --- chain/types/tipset_key.go | 10 ++-- cmd/lotus-provider/run.go | 2 +- lib/harmony/harmonydb/sql/20230823.sql | 21 ++++--- storage/wdpost/wdpost_changehandler_2.go | 45 ++++++++------- storage/wdpost/wdpost_run.go | 3 + storage/wdpost/wdpost_sched.go | 3 +- storage/wdpost/wdpost_task.go | 73 +++++++++++++++++++++--- 7 files changed, 115 insertions(+), 42 deletions(-) diff --git a/chain/types/tipset_key.go b/chain/types/tipset_key.go index 15e655da7..3b8be1432 100644 --- a/chain/types/tipset_key.go +++ b/chain/types/tipset_key.go @@ -38,7 +38,7 @@ type TipSetKey struct { // self-describing, wrapped as a string. // These gymnastics make the a TipSetKey usable as a map key. // The empty key has value "". - value string + Value string } // NewTipSetKey builds a new key from a slice of CIDs. @@ -59,7 +59,7 @@ func TipSetKeyFromBytes(encoded []byte) (TipSetKey, error) { // Cids returns a slice of the CIDs comprising this key. func (k TipSetKey) Cids() []cid.Cid { - cids, err := decodeKey([]byte(k.value)) + cids, err := decodeKey([]byte(k.Value)) if err != nil { panic("invalid tipset key: " + err.Error()) } @@ -83,7 +83,7 @@ func (k TipSetKey) String() string { // Bytes() returns a binary representation of the key. func (k TipSetKey) Bytes() []byte { - return []byte(k.value) + return []byte(k.Value) } func (k TipSetKey) MarshalJSON() ([]byte, error) { @@ -95,7 +95,7 @@ func (k *TipSetKey) UnmarshalJSON(b []byte) error { if err := json.Unmarshal(b, &cids); err != nil { return err } - k.value = string(encodeKey(cids)) + k.Value = string(encodeKey(cids)) return nil } @@ -161,7 +161,7 @@ func (k *TipSetKey) UnmarshalCBOR(reader io.Reader) error { } func (k TipSetKey) IsEmpty() bool { - return len(k.value) == 0 + return len(k.Value) == 0 } func encodeKey(cids []cid.Cid) []byte { diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index d90eebaab..fdf65e145 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -211,7 +211,7 @@ var runCmd = &cli.Command{ return err } - wdPostTask := wdpost.NewWdPostTask(db) + wdPostTask := wdpost.NewWdPostTask(db, nil) taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address) if err != nil { diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index 06abecca4..fc37b50e3 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -1,13 +1,17 @@ create table wdpost_tasks ( - task_id int not null, - tskey varchar, - current_epoch bigint, - period_start bigint, - index bigint, - open bigint, - close bigint, - challenge bigint, + task_id int not null + constraint wdpost_tasks_pkey + primary key, + tskey bytea not null, + current_epoch bigint not null, + period_start bigint not null, + index bigint not null + constraint wdpost_tasks_index_key + unique, + open bigint not null, + close bigint not null, + challenge bigint not null, fault_cutoff bigint, wpost_period_deadlines bigint, wpost_proving_period bigint, @@ -16,3 +20,4 @@ create table wdpost_tasks fault_declaration_cutoff bigint ); + diff --git a/storage/wdpost/wdpost_changehandler_2.go b/storage/wdpost/wdpost_changehandler_2.go index cb1facdcf..e8bd401a2 100644 --- a/storage/wdpost/wdpost_changehandler_2.go +++ b/storage/wdpost/wdpost_changehandler_2.go @@ -170,7 +170,8 @@ type proveHandler2 struct { processedHeadChanges chan *headChange processedPostResults chan *postResult - wdPostTask *WdPostTask + wdPostTask *WdPostTask + currDeadline *dline.Info } func newProver2( @@ -223,20 +224,22 @@ func (p *proveHandler2) run() { func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { // If the post window has expired, abort the current proof - if p.current != nil && newTS.Height() >= p.current.di.Close { - // Cancel the context on the current proof - p.current.abort() - - // Clear out the reference to the proof so that we can immediately - // start generating a new proof, without having to worry about state - // getting clobbered when the abort completes - p.current = nil - } - - // Only generate one proof at a time - if p.current != nil { - return - } + //if p.current != nil && newTS.Height() >= p.current.di.Close { + // log.Errorf("Aborted window post Proving (Deadline: %+v), newTs: %+v", p.current.di, newTS.Height()) + // // Cancel the context on the current proof + // p.current.abort() + // + // // Clear out the reference to the proof so that we can immediately + // // start generating a new proof, without having to worry about state + // // getting clobbered when the abort completes + // p.current = nil + //} + // + //// Only generate one proof at a time + //log.Errorf("p.current: %+v", p.current) + //if p.current != nil { + // return + //} // If the proof for the current post window has been generated, check the // next post window @@ -246,17 +249,19 @@ func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipS // _, complete = p.posts.get(di) //} + // Check if the chain is above the Challenge height for the post window + if newTS.Height() < di.Challenge+ChallengeConfidence { + return + } + + //p.current = ¤tPost{di: di} + err := p.wdPostTask.AddTask(ctx, newTS, di) if err != nil { log.Errorf("AddTask failed: %v", err) } - //// Check if the chain is above the Challenge height for the post window - //if newTS.Height() < di.Challenge+ChallengeConfidence { - // return - //} // - //p.current = ¤tPost{di: di} //curr := p.current //p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) { // p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err} diff --git a/storage/wdpost/wdpost_run.go b/storage/wdpost/wdpost_run.go index c2a448fb0..0ebcae279 100644 --- a/storage/wdpost/wdpost_run.go +++ b/storage/wdpost/wdpost_run.go @@ -270,6 +270,8 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di start := time.Now() + log.Errorf("runPoStCycle called with manual: %v, di: %v, ts: %v", manual, di, ts) + log := log.WithOptions(zap.Fields(zap.Time("cycle", start))) log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index) defer func() { @@ -311,6 +313,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di // allowed in a single message partitionBatches, err := s.BatchPartitions(partitions, nv) if err != nil { + log.Errorf("batch partitions failed: %+v", err) return nil, err } diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index 36829e107..f2f66e7bb 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -37,6 +37,7 @@ var log = logging.Logger("wdpost") type NodeAPI interface { ChainHead(context.Context) (*types.TipSet, error) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) + ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) @@ -139,7 +140,7 @@ func NewWindowedPoStScheduler(api NodeAPI, func (s *WindowPoStScheduler) Run(ctx context.Context) { // Initialize change handler. - wdPostTask := NewWdPostTask(s.db) + wdPostTask := NewWdPostTask(s.db, s) taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") if er != nil { diff --git a/storage/wdpost/wdpost_task.go b/storage/wdpost/wdpost_task.go index 8516bd766..f253bfd94 100644 --- a/storage/wdpost/wdpost_task.go +++ b/storage/wdpost/wdpost_task.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "time" ) type WdPostTaskDetails struct { @@ -14,13 +15,69 @@ type WdPostTaskDetails struct { } type WdPostTask struct { - tasks chan *WdPostTaskDetails - db *harmonydb.DB + tasks chan *WdPostTaskDetails + db *harmonydb.DB + scheduler *WindowPoStScheduler } func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + time.Sleep(5 * time.Second) log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) + + var tsKeyBytes []byte + var deadline dline.Info + + err = t.db.QueryRow(context.Background(), + `Select tskey, + current_epoch, + period_start, + index, + open, + close, + challenge, + fault_cutoff, + wpost_period_deadlines, + wpost_proving_period, + wpost_challenge_window, + wpost_challenge_lookback, + fault_declaration_cutoff + from wdpost_tasks + where task_id = $1`, taskID).Scan( + &tsKeyBytes, + &deadline.CurrentEpoch, + &deadline.PeriodStart, + &deadline.Index, + &deadline.Open, + &deadline.Close, + &deadline.Challenge, + &deadline.FaultCutoff, + &deadline.WPoStPeriodDeadlines, + &deadline.WPoStProvingPeriod, + &deadline.WPoStChallengeWindow, + &deadline.WPoStChallengeLookback, + &deadline.FaultDeclarationCutoff, + ) + if err != nil { + log.Errorf("WdPostTask.Do() failed to queryRow: %v", err) + return false, err + } + + log.Errorf("tskEY: %v", tsKeyBytes) + tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes) + ts, err := t.scheduler.api.ChainGetTipSet(context.Background(), tsKey) + if err != nil { + log.Errorf("WdPostTask.Do() failed to get tipset: %v", err) + return false, err + } + submitWdPostParams, err := t.scheduler.runPoStCycle(context.Background(), false, deadline, ts) + if err != nil { + log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err) + return false, err + } + + log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) + return true, nil } @@ -31,6 +88,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID) (*harmonytask.TaskID, e func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Name: "WdPostCompute", + Max: -1, } } @@ -49,10 +107,11 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { } } -func NewWdPostTask(db *harmonydb.DB) *WdPostTask { +func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler) *WdPostTask { return &WdPostTask{ - tasks: make(chan *WdPostTaskDetails, 2), - db: db, + tasks: make(chan *WdPostTaskDetails, 2), + db: db, + scheduler: scheduler, } } @@ -92,7 +151,7 @@ func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId fault_declaration_cutoff ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, taskId, - tsKey.String(), + tsKey.Bytes(), deadline.CurrentEpoch, deadline.PeriodStart, deadline.Index, @@ -134,7 +193,7 @@ func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline fault_declaration_cutoff ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, taskId, - tsKey.String(), + tsKey.Bytes(), deadline.CurrentEpoch, deadline.PeriodStart, deadline.Index,