diff --git a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index 3c4368d5c..3a75d8a96 100644 --- a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -75,6 +75,21 @@ create table sectors_sdr_pipeline ( failed_reason varchar(20) not null default '', failed_reason_msg text not null default '', + -- foreign key + -- note: those foreign keys are a part of the retry mechanism. If a task + -- fails due to retry limit, it will drop the assigned task_id, and the + -- poller will reassign the task to a new node if it deems the task is + -- still valid to be retried. + foreign key (task_id_sdr) references harmony_task (id) on delete set null, + foreign key (task_id_tree_d) references harmony_task (id) on delete set null, + foreign key (task_id_tree_c) references harmony_task (id) on delete set null, + foreign key (task_id_tree_r) references harmony_task (id) on delete set null, + foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null, + foreign key (task_id_porep) references harmony_task (id) on delete set null, + foreign key (task_id_finalize) references harmony_task (id) on delete set null, + foreign key (task_id_move_storage) references harmony_task (id) on delete set null, + foreign key (task_id_commit_msg) references harmony_task (id) on delete set null, + -- constraints primary key (sp_id, sector_number) ); diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 93418ba28..5702b3142 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -29,15 +29,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error retryAddTask: _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { // create taskID (from DB) - _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) - VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) + err := tx.QueryRow(`INSERT INTO harmony_task (name, added_by, posted_time) + VALUES ($1, $2, CURRENT_TIMESTAMP) RETURNING id`, h.Name, h.TaskEngine.ownerID).Scan(&tID) if err != nil { return false, fmt.Errorf("could not insert into harmonyTask: %w", err) } - err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID) - if err != nil { - return false, fmt.Errorf("Could not select ID: %w", err) - } return extra(tID, tx) }) diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 134a389fc..936b6b1b9 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -70,6 +70,15 @@ func (s *SealPoller) RunPoller(ctx context.Context) { } } +/* +NOTE: TaskIDs are ONLY set while the tasks are executing or waiting to execute. + This means that there are ~4 states each task can be in: +* Not run, and dependencies not solved (dependencies are 'After' fields of previous stages), task is null, After is false +* Not run, and dependencies solved, task is null, After is false +* Running or queued, task is set, After is false +* Finished, task is null, After is true +*/ + type pollTask struct { SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` @@ -158,7 +167,7 @@ func (s *SealPoller) poll(ctx context.Context) error { } func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { - if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { + if !task.AfterSDR && task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) if err != nil { @@ -174,7 +183,10 @@ func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { } func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { - if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR { + if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR && + task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && + s.pollers[pollerTrees].IsSet() && task.AfterSDR { + s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) @@ -191,7 +203,10 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { } func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { + if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && + task.TaskPoRep == nil && !task.AfterPoRep && + ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { + s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) if err != nil { @@ -207,7 +222,7 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type } func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && task.TaskFinalize == nil { + if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && !task.AfterFinalize && task.TaskFinalize == nil { s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber) if err != nil { @@ -223,7 +238,7 @@ func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *t } func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) { - if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && task.TaskMoveStorage == nil { + if s.pollers[pollerMoveStorage].IsSet() && task.AfterFinalize && !task.AfterMoveStorage && task.TaskMoveStorage == nil { s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber) if err != nil {