diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index 282068572..3b375c332 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -1,25 +1,22 @@ -create table wdpost_tasks +create table wdpost_partition_tasks ( - task_id int not null - constraint wdpost_tasks_pkey + task_id bigint not null + constraint wdpost_partition_tasks_pk primary key, - tskey bytea not null, - current_epoch bigint not null, - period_start bigint not null, - index bigint not null - constraint wdpost_tasks_index_key - unique, - open bigint not null, - close bigint not null, - challenge bigint not null, - fault_cutoff bigint, - wpost_period_deadlines bigint, - wpost_proving_period bigint, - wpost_challenge_window bigint, - wpost_challenge_lookback bigint, - fault_declaration_cutoff bigint + sp_id bigint not null, + proving_period_start bigint not null, + deadline_index bigint not null, + partition_index bigint not null, + constraint wdpost_partition_tasks_identity_key + unique (sp_id, proving_period_start, deadline_index, partition_index) ); +comment on column wdpost_partition_tasks.task_id is 'harmonytask task ID'; +comment on column wdpost_partition_tasks.sp_id is 'storage provider ID'; +comment on column wdpost_partition_tasks.proving_period_start is 'proving period start'; +comment on column wdpost_partition_tasks.deadline_index is 'deadline index within the proving period'; +comment on column wdpost_partition_tasks.partition_index is 'partition index within the deadline'; + create table wdpost_proofs ( deadline bigint not null, diff --git a/provider/lpwindow/task.go b/provider/lpwindow/task.go index 7d3a9cff0..cbb4354d0 100644 --- a/provider/lpwindow/task.go +++ b/provider/lpwindow/task.go @@ -3,7 +3,11 @@ package lpwindow import ( "context" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/wdpost" logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" "sort" "time" @@ -21,6 +25,8 @@ import ( var log = logging.Logger("lpwindow") +var EpochsPerDeadline = miner.WPoStProvingPeriod / abi.ChainEpoch(miner.WPoStPeriodDeadlines) + type WdPostTaskDetails struct { Ts *types.TipSet Deadline *dline.Info @@ -30,6 +36,8 @@ type WDPoStAPI interface { ChainHead(context.Context) (*types.TipSet, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) } type WdPostTask struct { @@ -44,68 +52,34 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done time.Sleep(5 * time.Second) log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) - var tsKeyBytes []byte var deadline dline.Info + var spID, pps, dlIdx, partIdx uint64 + err = t.db.QueryRow(context.Background(), - `Select tskey, - current_epoch, - period_start, - index, - open, - close, - challenge, - fault_cutoff, - wpost_period_deadlines, - wpost_proving_period, - wpost_challenge_window, - wpost_challenge_lookback, - fault_declaration_cutoff + `Select sp_id, proving_period_start, deadline_index, partition_index from wdpost_tasks where task_id = $1`, taskID).Scan( - &tsKeyBytes, - &deadline.CurrentEpoch, - &deadline.PeriodStart, - &deadline.Index, - &deadline.Open, - &deadline.Close, - &deadline.Challenge, - &deadline.FaultCutoff, - &deadline.WPoStPeriodDeadlines, - &deadline.WPoStProvingPeriod, - &deadline.WPoStChallengeWindow, - &deadline.WPoStChallengeLookback, - &deadline.FaultDeclarationCutoff, + &spID, &pps, &dlIdx, &partIdx, ) if err != nil { log.Errorf("WdPostTask.Do() failed to queryRow: %v", err) return false, err } - log.Errorf("tskEY: %v", tsKeyBytes) - tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes) - if err != nil { - log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err) - return false, err - } head, err := t.api.ChainHead(context.Background()) if err != nil { log.Errorf("WdPostTask.Do() failed to get chain head: %v", err) return false, err } - if deadline.Close > head.Height() { - log.Errorf("WdPost removed stale task: %v %v", taskID, tsKey) + wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height()) + + if deadline.PeriodElapsed() { + log.Errorf("WdPost removed stale task: %v %v", taskID, deadline) return true, nil } - ts, err := t.api.ChainGetTipSet(context.Background(), tsKey) - if err != nil { - log.Errorf("WdPostTask.Do() failed to get tipset: %v", err) - return false, err - } - _ = ts - panic("todo") /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) @@ -158,19 +132,24 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng // GetData for tasks type wdTaskDef struct { - abi.RegisteredSealProof - Task_id harmonytask.TaskID - Tskey []byte - Open abi.ChainEpoch - Close abi.ChainEpoch + Task_id harmonytask.TaskID + Sp_id uint64 + Proving_period_start abi.ChainEpoch + Deadline_index uint64 + Partition_index uint64 + + dlInfo *dline.Info `pgx:"-"` + openTs *types.TipSet } var tasks []wdTaskDef + err = t.db.Select(context.Background(), &tasks, - `Select tskey, + `Select task_id, - period_start, - open, - close + sp_id, + proving_period_start, + deadline_index, + partition_index from wdpost_tasks where task_id IN $1`, ids) if err != nil { @@ -178,16 +157,35 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng } // Accept those past deadline, then delete them in Do(). - for _, task := range tasks { - if task.Close < ts.Height() { - return &task.Task_id, nil + for i := range tasks { + tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].Proving_period_start, tasks[i].Deadline_index, ts.Height()) + + if tasks[i].dlInfo.PeriodElapsed() { + return &tasks[i].Task_id, nil + } + + tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting task open tipset: %w", err) } } // Discard those too big for our free RAM freeRAM := te.ResourcesAvailable().Ram tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool { - return res[d.RegisteredSealProof].MaxMemory <= freeRAM + maddr, err := address.NewIDAddress(tasks[0].Sp_id) + if err != nil { + log.Errorf("WdPostTask.CanAccept() failed to NewIDAddress: %v", err) + return false + } + + mi, err := t.api.StateMinerInfo(context.Background(), maddr, ts.Key()) + if err != nil { + log.Errorf("WdPostTask.CanAccept() failed to StateMinerInfo: %v", err) + return false + } + + return res[mi.WindowPoStProofType].MaxMemory <= freeRAM }) if len(tasks) == 0 { log.Infof("RAM too small for any WDPost task") @@ -208,7 +206,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng // Select the one closest to the deadline sort.Slice(tasks, func(i, j int) bool { - return tasks[i].Close < tasks[j].Close + return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open }) return &tasks[0].Task_id, nil @@ -257,43 +255,22 @@ func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask { } } -func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { - - tsKey := ts.Key() - - //log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId) +func (t *WdPostTask) addTaskToDB(deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { _, err := tx.Exec( `INSERT INTO wdpost_tasks ( task_id, - tskey, - current_epoch, - period_start, - index, - open, - close, - challenge, - fault_cutoff, - wpost_period_deadlines, - wpost_proving_period, - wpost_challenge_window, - wpost_challenge_lookback, - fault_declaration_cutoff + sp_id, + proving_period_start, + deadline_index, + partition_index, + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, taskId, - tsKey.Bytes(), - deadline.CurrentEpoch, + spID, deadline.PeriodStart, deadline.Index, - deadline.Open, - deadline.Close, - deadline.Challenge, - deadline.FaultCutoff, - deadline.WPoStPeriodDeadlines, - deadline.WPoStProvingPeriod, - deadline.WPoStChallengeWindow, - deadline.WPoStChallengeLookback, - deadline.FaultDeclarationCutoff, + partID, ) if err != nil { return false, err