lpseal: Make tasks return to poller when out of retries

This commit is contained in:
Łukasz Magiera 2024-02-08 21:24:48 +01:00
parent e37b7f6083
commit 4f8d6add1d
3 changed files with 37 additions and 11 deletions

View File

@ -75,6 +75,21 @@ create table sectors_sdr_pipeline (
failed_reason varchar(20) not null default '', failed_reason varchar(20) not null default '',
failed_reason_msg text not null default '', failed_reason_msg text not null default '',
-- foreign key
-- note: those foreign keys are a part of the retry mechanism. If a task
-- fails due to retry limit, it will drop the assigned task_id, and the
-- poller will reassign the task to a new node if it deems the task is
-- still valid to be retried.
foreign key (task_id_sdr) references harmony_task (id) on delete set null,
foreign key (task_id_tree_d) references harmony_task (id) on delete set null,
foreign key (task_id_tree_c) references harmony_task (id) on delete set null,
foreign key (task_id_tree_r) references harmony_task (id) on delete set null,
foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null,
foreign key (task_id_porep) references harmony_task (id) on delete set null,
foreign key (task_id_finalize) references harmony_task (id) on delete set null,
foreign key (task_id_move_storage) references harmony_task (id) on delete set null,
foreign key (task_id_commit_msg) references harmony_task (id) on delete set null,
-- constraints -- constraints
primary key (sp_id, sector_number) primary key (sp_id, sector_number)
); );

View File

@ -29,15 +29,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
retryAddTask: retryAddTask:
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
// create taskID (from DB) // create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) err := tx.QueryRow(`INSERT INTO harmony_task (name, added_by, posted_time)
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) VALUES ($1, $2, CURRENT_TIMESTAMP) RETURNING id`, h.Name, h.TaskEngine.ownerID).Scan(&tID)
if err != nil { if err != nil {
return false, fmt.Errorf("could not insert into harmonyTask: %w", err) return false, fmt.Errorf("could not insert into harmonyTask: %w", err)
} }
err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID)
if err != nil {
return false, fmt.Errorf("Could not select ID: %w", err)
}
return extra(tID, tx) return extra(tID, tx)
}) })

View File

@ -70,6 +70,15 @@ func (s *SealPoller) RunPoller(ctx context.Context) {
} }
} }
/*
NOTE: TaskIDs are ONLY set while the tasks are executing or waiting to execute.
This means that there are ~4 states each task can be in:
* Not run, and dependencies not solved (dependencies are 'After' fields of previous stages), task is null, After is false
* Not run, and dependencies solved, task is null, After is false
* Running or queued, task is set, After is false
* Finished, task is null, After is true
*/
type pollTask struct { type pollTask struct {
SpID int64 `db:"sp_id"` SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"` SectorNumber int64 `db:"sector_number"`
@ -158,7 +167,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
} }
func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) {
if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { if !task.AfterSDR && task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() {
s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
@ -174,7 +183,10 @@ func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) {
} }
func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR { if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR &&
task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil &&
s.pollers[pollerTrees].IsSet() && task.AfterSDR {
s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1
WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber)
@ -191,7 +203,10 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) {
} }
func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil &&
task.TaskPoRep == nil && !task.AfterPoRep &&
ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
@ -207,7 +222,7 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type
} }
func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && task.TaskFinalize == nil { if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && !task.AfterFinalize && task.TaskFinalize == nil {
s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
@ -223,7 +238,7 @@ func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *t
} }
func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) { func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) {
if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && task.TaskMoveStorage == nil { if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && !task.AfterMoveStorage && task.TaskMoveStorage == nil {
s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {