From e14a85bcd32f7a69815327c07a6dbffc0f19cd30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 May 2024 17:40:40 +0200 Subject: [PATCH] fix: curio: Drop FKs from pipeline to fix retry loops (#11973) --- curiosrc/piece/task_cleanup_piece.go | 5 ++++ curiosrc/piece/task_park_piece.go | 2 +- curiosrc/seal/task_finalize.go | 2 +- curiosrc/seal/task_movestorage.go | 2 +- curiosrc/seal/task_porep.go | 2 +- curiosrc/seal/task_sdr.go | 2 +- curiosrc/seal/task_submit_commit.go | 2 +- curiosrc/seal/task_submit_precommit.go | 6 ++--- curiosrc/seal/task_treed.go | 2 +- curiosrc/seal/task_treerc.go | 2 +- .../harmonydb/sql/20231217-sdr-pipeline.sql | 23 ++++++++----------- .../harmonydb/sql/20240228-piece-park.sql | 6 +++-- .../sql/20240507-sdr-pipeline-fk-drop.sql | 12 ++++++++++ 13 files changed, 42 insertions(+), 26 deletions(-) create mode 100644 lib/harmony/harmonydb/sql/20240507-sdr-pipeline-fk-drop.sql diff --git a/curiosrc/piece/task_cleanup_piece.go b/curiosrc/piece/task_cleanup_piece.go index ed22ccb46..e4110e3ee 100644 --- a/curiosrc/piece/task_cleanup_piece.go +++ b/curiosrc/piece/task_cleanup_piece.go @@ -90,6 +90,11 @@ func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) } if n == 0 { + _, err = c.db.Exec(ctx, `UPDATE parked_pieces SET cleanup_task_id = NULL WHERE id = $1`, pieceID) + if err != nil { + return false, xerrors.Errorf("marking piece as complete: %w", err) + } + return true, nil } diff --git a/curiosrc/piece/task_park_piece.go b/curiosrc/piece/task_park_piece.go index dcc8bb59b..57e75be9d 100644 --- a/curiosrc/piece/task_park_piece.go +++ b/curiosrc/piece/task_park_piece.go @@ -159,7 +159,7 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d } // Update the piece as complete after a successful write. - _, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID) + _, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE, task_id = NULL WHERE id = $1`, pieceData.PieceID) if err != nil { return false, xerrors.Errorf("marking piece as complete: %w", err) } diff --git a/curiosrc/seal/task_finalize.go b/curiosrc/seal/task_finalize.go index 2b362d7be..9fbc6cf18 100644 --- a/curiosrc/seal/task_finalize.go +++ b/curiosrc/seal/task_finalize.go @@ -74,7 +74,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do } // set after_finalize - _, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID) + _, err = f.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_finalize = TRUE, task_id_finalize = NULL WHERE task_id_finalize = $1`, taskID) if err != nil { return false, xerrors.Errorf("updating task: %w", err) } diff --git a/curiosrc/seal/task_movestorage.go b/curiosrc/seal/task_movestorage.go index dab899582..f4bcfd863 100644 --- a/curiosrc/seal/task_movestorage.go +++ b/curiosrc/seal/task_movestorage.go @@ -64,7 +64,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("moving storage: %w", err) } - _, err = m.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_move_storage = true WHERE task_id_move_storage = $1`, taskID) + _, err = m.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_move_storage = TRUE, task_id_move_storage = NULL WHERE task_id_move_storage = $1`, taskID) if err != nil { return false, xerrors.Errorf("updating task: %w", err) } diff --git a/curiosrc/seal/task_porep.go b/curiosrc/seal/task_porep.go index d4d744e40..530822d72 100644 --- a/curiosrc/seal/task_porep.go +++ b/curiosrc/seal/task_porep.go @@ -126,7 +126,7 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // store success! n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_porep = TRUE, seed_value = $3, porep_proof = $4 + SET after_porep = TRUE, seed_value = $3, porep_proof = $4, task_id_porep = NULL WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof) if err != nil { diff --git a/curiosrc/seal/task_sdr.go b/curiosrc/seal/task_sdr.go index 0a3aebcd4..6f963546a 100644 --- a/curiosrc/seal/task_sdr.go +++ b/curiosrc/seal/task_sdr.go @@ -154,7 +154,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo // store success! n, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_sdr = true, ticket_epoch = $3, ticket_value = $4 + SET after_sdr = true, ticket_epoch = $3, ticket_value = $4, task_id_sdr = NULL WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, ticketEpoch, []byte(ticket)) if err != nil { diff --git a/curiosrc/seal/task_submit_commit.go b/curiosrc/seal/task_submit_commit.go index 73d452e0e..3acaa06fe 100644 --- a/curiosrc/seal/task_submit_commit.go +++ b/curiosrc/seal/task_submit_commit.go @@ -141,7 +141,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("pushing message to mpool: %w", err) } - _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber) + _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE, task_id_commit_msg = NULL WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("updating commit_msg_cid: %w", err) } diff --git a/curiosrc/seal/task_submit_precommit.go b/curiosrc/seal/task_submit_precommit.go index d42bcbe0d..ecdcc6ea8 100644 --- a/curiosrc/seal/task_submit_precommit.go +++ b/curiosrc/seal/task_submit_precommit.go @@ -148,7 +148,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo if abi.ChainEpoch(pieces[0].F05DealStartEpoch) < head.Height() { // deal start epoch is in the past, can't precommit this sector anymore _, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET failed = TRUE, failed_at = NOW(), failed_reason = 'past-start-epoch', failed_reason_msg = 'precommit: start epoch is in the past' + SET failed = TRUE, failed_at = NOW(), failed_reason = 'past-start-epoch', failed_reason_msg = 'precommit: start epoch is in the past', task_id_precommit_msg = NULL WHERE task_id_precommit_msg = $1`, taskID) if perr != nil { return false, xerrors.Errorf("persisting precommit start epoch expiry: %w", perr) @@ -186,7 +186,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo if err != nil { if record { _, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1 + SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1, task_id_precommit_msg = NULL WHERE task_id_precommit_msg = $2`, err.Error(), taskID) if perr != nil { return false, xerrors.Errorf("persisting precommit check error: %w", perr) @@ -238,7 +238,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo // set precommit_msg_cid _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET precommit_msg_cid = $1, after_precommit_msg = TRUE + SET precommit_msg_cid = $1, after_precommit_msg = TRUE, task_id_precommit_msg = NULL WHERE task_id_precommit_msg = $2`, mcid, taskID) if err != nil { return false, xerrors.Errorf("updating precommit_msg_cid: %w", err) diff --git a/curiosrc/seal/task_treed.go b/curiosrc/seal/task_treed.go index fda1d1f65..e7a3ade07 100644 --- a/curiosrc/seal/task_treed.go +++ b/curiosrc/seal/task_treed.go @@ -249,7 +249,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_d = true, tree_d_cid = $3 WHERE sp_id = $1 AND sector_number = $2`, + SET after_tree_d = true, tree_d_cid = $3, task_id_tree_d = NULL WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, commd) if err != nil { return false, xerrors.Errorf("store TreeD success: updating pipeline: %w", err) diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go index 7584c47ac..a1cd8cdc4 100644 --- a/curiosrc/seal/task_treerc.go +++ b/curiosrc/seal/task_treerc.go @@ -81,7 +81,7 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // todo porep challenge check n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3 + SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, task_id_tree_r = NULL, task_id_tree_c = NULL WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, sealed) if err != nil { diff --git a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index 31f103139..bc5a43885 100644 --- a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -76,19 +76,16 @@ create table sectors_sdr_pipeline ( failed_reason_msg text not null default '', -- foreign key - -- note: those foreign keys are a part of the retry mechanism. If a task - -- fails due to retry limit, it will drop the assigned task_id, and the - -- poller will reassign the task to a new node if it deems the task is - -- still valid to be retried. - foreign key (task_id_sdr) references harmony_task (id) on delete set null, - foreign key (task_id_tree_d) references harmony_task (id) on delete set null, - foreign key (task_id_tree_c) references harmony_task (id) on delete set null, - foreign key (task_id_tree_r) references harmony_task (id) on delete set null, - foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null, - foreign key (task_id_porep) references harmony_task (id) on delete set null, - foreign key (task_id_finalize) references harmony_task (id) on delete set null, - foreign key (task_id_move_storage) references harmony_task (id) on delete set null, - foreign key (task_id_commit_msg) references harmony_task (id) on delete set null, + -- NOTE: Following keys were dropped in 20240507-sdr-pipeline-fk-drop.sql + foreign key (task_id_sdr) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_tree_d) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_tree_c) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_tree_r) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_precommit_msg) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_porep) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_finalize) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_move_storage) references harmony_task (id) on delete set null, -- dropped + foreign key (task_id_commit_msg) references harmony_task (id) on delete set null, -- dropped -- constraints primary key (sp_id, sector_number) diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index 9ee6b447f..add0a4093 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -11,8 +11,10 @@ create table parked_pieces ( cleanup_task_id bigint default null, - foreign key (task_id) references harmony_task (id) on delete set null, - foreign key (cleanup_task_id) references harmony_task (id) on delete set null, + -- NOTE: Following keys were dropped in 20240507-sdr-pipeline-fk-drop.sql + foreign key (task_id) references harmony_task (id) on delete set null, -- dropped + foreign key (cleanup_task_id) references harmony_task (id) on delete set null, -- dropped + unique (piece_cid) ); diff --git a/lib/harmony/harmonydb/sql/20240507-sdr-pipeline-fk-drop.sql b/lib/harmony/harmonydb/sql/20240507-sdr-pipeline-fk-drop.sql new file mode 100644 index 000000000..daf7a4429 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240507-sdr-pipeline-fk-drop.sql @@ -0,0 +1,12 @@ +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_commit_msg_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_finalize_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_move_storage_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_porep_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_precommit_msg_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_sdr_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_c_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_d_fkey; +ALTER TABLE sectors_sdr_pipeline DROP CONSTRAINT sectors_sdr_pipeline_task_id_tree_r_fkey; + +ALTER TABLE parked_pieces DROP CONSTRAINT parked_pieces_cleanup_task_id_fkey; +ALTER TABLE parked_pieces DROP CONSTRAINT parked_pieces_task_id_fkey;