From 8a739875294b1bcf7cc380c2f7b22f1d71aae505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 8 Feb 2024 22:29:43 +0100 Subject: [PATCH] lpseal: Fix finalize --- provider/lpseal/task_finalize.go | 16 +++++++++++++--- provider/lpseal/task_movestorage.go | 14 +++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/provider/lpseal/task_finalize.go b/provider/lpseal/task_finalize.go index 69162577e..1738eaf7b 100644 --- a/provider/lpseal/task_finalize.go +++ b/provider/lpseal/task_finalize.go @@ -31,7 +31,7 @@ func NewFinalizeTask(max int, sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb } func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - var task struct { + var tasks []struct { SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` RegSealProof int64 `db:"reg_seal_proof"` @@ -39,12 +39,17 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do ctx := context.Background() - err = f.db.Select(ctx, &task, ` + err = f.db.Select(ctx, &tasks, ` select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID) if err != nil { return false, xerrors.Errorf("getting task: %w", err) } + if len(tasks) != 1 { + return false, xerrors.Errorf("expected one task") + } + task := tasks[0] + sector := storiface.SectorRef{ ID: abi.SectorID{ Miner: abi.ActorID(task.SpID), @@ -81,10 +86,15 @@ func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T ctx := context.Background() + indIDs := make([]int64, len(ids)) + for i, id := range ids { + indIDs[i] = int64(id) + } + err := f.db.Select(ctx, &tasks, ` select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num - where task_id_finalize in ($1) and l.sector_filetype=4`, ids) + where task_id_finalize = ANY ($1) and l.sector_filetype=4`, indIDs) if err != nil { return nil, xerrors.Errorf("getting tasks: %w", err) } diff --git a/provider/lpseal/task_movestorage.go b/provider/lpseal/task_movestorage.go index 0e974e76a..1e7688482 100644 --- a/provider/lpseal/task_movestorage.go +++ b/provider/lpseal/task_movestorage.go @@ -32,7 +32,7 @@ func NewMoveStorageTask(sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb.DB, m } func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - var task struct { + var tasks []struct { SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` RegSealProof int64 `db:"reg_seal_proof"` @@ -40,11 +40,15 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) ctx := context.Background() - err = m.db.Select(ctx, &task, ` + err = m.db.Select(ctx, &tasks, ` select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_move_storage=$1`, taskID) if err != nil { return false, xerrors.Errorf("getting task: %w", err) } + if len(tasks) != 1 { + return false, xerrors.Errorf("expected one task") + } + task := tasks[0] sector := storiface.SectorRef{ ID: abi.SectorID{ @@ -76,10 +80,14 @@ func (m *MoveStorageTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas StorageID string `db:"storage_id"` } + indIDs := make([]int64, len(ids)) + for i, id := range ids { + indIDs[i] = int64(id) + } err := m.db.Select(ctx, &tasks, ` select p.task_id_move_storage, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num - where task_id_move_storage in ($1) and l.sector_filetype=4`, ids) + where task_id_move_storage in ($1) and l.sector_filetype=4`, indIDs) if err != nil { return nil, xerrors.Errorf("getting tasks: %w", err) }