lpwindow: cleanup windowpost task schema

This commit is contained in:
Łukasz Magiera 2023-10-25 17:01:24 +02:00
parent 4efb34e290
commit 9ec9360895
2 changed files with 77 additions and 103 deletions

View File

@ -1,25 +1,22 @@
create table wdpost_tasks create table wdpost_partition_tasks
( (
task_id int not null task_id bigint not null
constraint wdpost_tasks_pkey constraint wdpost_partition_tasks_pk
primary key, primary key,
tskey bytea not null, sp_id bigint not null,
current_epoch bigint not null, proving_period_start bigint not null,
period_start bigint not null, deadline_index bigint not null,
index bigint not null partition_index bigint not null,
constraint wdpost_tasks_index_key constraint wdpost_partition_tasks_identity_key
unique, unique (sp_id, proving_period_start, deadline_index, partition_index)
open bigint not null,
close bigint not null,
challenge bigint not null,
fault_cutoff bigint,
wpost_period_deadlines bigint,
wpost_proving_period bigint,
wpost_challenge_window bigint,
wpost_challenge_lookback bigint,
fault_declaration_cutoff bigint
); );
comment on column wdpost_partition_tasks.task_id is 'harmonytask task ID';
comment on column wdpost_partition_tasks.sp_id is 'storage provider ID';
comment on column wdpost_partition_tasks.proving_period_start is 'proving period start';
comment on column wdpost_partition_tasks.deadline_index is 'deadline index within the proving period';
comment on column wdpost_partition_tasks.partition_index is 'partition index within the deadline';
create table wdpost_proofs create table wdpost_proofs
( (
deadline bigint not null, deadline bigint not null,

View File

@ -3,7 +3,11 @@ package lpwindow
import ( import (
"context" "context"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/wdpost"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"sort" "sort"
"time" "time"
@ -21,6 +25,8 @@ import (
var log = logging.Logger("lpwindow") var log = logging.Logger("lpwindow")
var EpochsPerDeadline = miner.WPoStProvingPeriod / abi.ChainEpoch(miner.WPoStPeriodDeadlines)
type WdPostTaskDetails struct { type WdPostTaskDetails struct {
Ts *types.TipSet Ts *types.TipSet
Deadline *dline.Info Deadline *dline.Info
@ -30,6 +36,8 @@ type WDPoStAPI interface {
ChainHead(context.Context) (*types.TipSet, error) ChainHead(context.Context) (*types.TipSet, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
} }
type WdPostTask struct { type WdPostTask struct {
@ -44,68 +52,34 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
var tsKeyBytes []byte
var deadline dline.Info var deadline dline.Info
var spID, pps, dlIdx, partIdx uint64
err = t.db.QueryRow(context.Background(), err = t.db.QueryRow(context.Background(),
`Select tskey, `Select sp_id, proving_period_start, deadline_index, partition_index
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
from wdpost_tasks from wdpost_tasks
where task_id = $1`, taskID).Scan( where task_id = $1`, taskID).Scan(
&tsKeyBytes, &spID, &pps, &dlIdx, &partIdx,
&deadline.CurrentEpoch,
&deadline.PeriodStart,
&deadline.Index,
&deadline.Open,
&deadline.Close,
&deadline.Challenge,
&deadline.FaultCutoff,
&deadline.WPoStPeriodDeadlines,
&deadline.WPoStProvingPeriod,
&deadline.WPoStChallengeWindow,
&deadline.WPoStChallengeLookback,
&deadline.FaultDeclarationCutoff,
) )
if err != nil { if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err) log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
return false, err return false, err
} }
log.Errorf("tskEY: %v", tsKeyBytes)
tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err)
return false, err
}
head, err := t.api.ChainHead(context.Background()) head, err := t.api.ChainHead(context.Background())
if err != nil { if err != nil {
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err) log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
return false, err return false, err
} }
if deadline.Close > head.Height() { wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
log.Errorf("WdPost removed stale task: %v %v", taskID, tsKey)
if deadline.PeriodElapsed() {
log.Errorf("WdPost removed stale task: %v %v", taskID, deadline)
return true, nil return true, nil
} }
ts, err := t.api.ChainGetTipSet(context.Background(), tsKey)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
return false, err
}
_ = ts
panic("todo") panic("todo")
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
@ -158,19 +132,24 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
// GetData for tasks // GetData for tasks
type wdTaskDef struct { type wdTaskDef struct {
abi.RegisteredSealProof
Task_id harmonytask.TaskID Task_id harmonytask.TaskID
Tskey []byte Sp_id uint64
Open abi.ChainEpoch Proving_period_start abi.ChainEpoch
Close abi.ChainEpoch Deadline_index uint64
Partition_index uint64
dlInfo *dline.Info `pgx:"-"`
openTs *types.TipSet
} }
var tasks []wdTaskDef var tasks []wdTaskDef
err = t.db.Select(context.Background(), &tasks, err = t.db.Select(context.Background(), &tasks,
`Select tskey, `Select
task_id, task_id,
period_start, sp_id,
open, proving_period_start,
close deadline_index,
partition_index
from wdpost_tasks from wdpost_tasks
where task_id IN $1`, ids) where task_id IN $1`, ids)
if err != nil { if err != nil {
@ -178,16 +157,35 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
} }
// Accept those past deadline, then delete them in Do(). // Accept those past deadline, then delete them in Do().
for _, task := range tasks { for i := range tasks {
if task.Close < ts.Height() { tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].Proving_period_start, tasks[i].Deadline_index, ts.Height())
return &task.Task_id, nil
if tasks[i].dlInfo.PeriodElapsed() {
return &tasks[i].Task_id, nil
}
tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key())
if err != nil {
return nil, xerrors.Errorf("getting task open tipset: %w", err)
} }
} }
// Discard those too big for our free RAM // Discard those too big for our free RAM
freeRAM := te.ResourcesAvailable().Ram freeRAM := te.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool { tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
return res[d.RegisteredSealProof].MaxMemory <= freeRAM maddr, err := address.NewIDAddress(tasks[0].Sp_id)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to NewIDAddress: %v", err)
return false
}
mi, err := t.api.StateMinerInfo(context.Background(), maddr, ts.Key())
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to StateMinerInfo: %v", err)
return false
}
return res[mi.WindowPoStProofType].MaxMemory <= freeRAM
}) })
if len(tasks) == 0 { if len(tasks) == 0 {
log.Infof("RAM too small for any WDPost task") log.Infof("RAM too small for any WDPost task")
@ -208,7 +206,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
// Select the one closest to the deadline // Select the one closest to the deadline
sort.Slice(tasks, func(i, j int) bool { sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Close < tasks[j].Close return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open
}) })
return &tasks[0].Task_id, nil return &tasks[0].Task_id, nil
@ -257,43 +255,22 @@ func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask {
} }
} }
func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { func (t *WdPostTask) addTaskToDB(deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
tsKey := ts.Key()
//log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId)
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO wdpost_tasks ( `INSERT INTO wdpost_tasks (
task_id, task_id,
tskey, sp_id,
current_epoch, proving_period_start,
period_start, deadline_index,
index, partition_index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId, taskId,
tsKey.Bytes(), spID,
deadline.CurrentEpoch,
deadline.PeriodStart, deadline.PeriodStart,
deadline.Index, deadline.Index,
deadline.Open, partID,
deadline.Close,
deadline.Challenge,
deadline.FaultCutoff,
deadline.WPoStPeriodDeadlines,
deadline.WPoStProvingPeriod,
deadline.WPoStChallengeWindow,
deadline.WPoStChallengeLookback,
deadline.FaultDeclarationCutoff,
) )
if err != nil { if err != nil {
return false, err return false, err