lpseal: Fix finalize
This commit is contained in:
parent
25f1fd7952
commit
8a73987529
@ -31,7 +31,7 @@ func NewFinalizeTask(max int, sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
var task struct {
|
var tasks []struct {
|
||||||
SpID int64 `db:"sp_id"`
|
SpID int64 `db:"sp_id"`
|
||||||
SectorNumber int64 `db:"sector_number"`
|
SectorNumber int64 `db:"sector_number"`
|
||||||
RegSealProof int64 `db:"reg_seal_proof"`
|
RegSealProof int64 `db:"reg_seal_proof"`
|
||||||
@ -39,12 +39,17 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
err = f.db.Select(ctx, &task, `
|
err = f.db.Select(ctx, &tasks, `
|
||||||
select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID)
|
select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting task: %w", err)
|
return false, xerrors.Errorf("getting task: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(tasks) != 1 {
|
||||||
|
return false, xerrors.Errorf("expected one task")
|
||||||
|
}
|
||||||
|
task := tasks[0]
|
||||||
|
|
||||||
sector := storiface.SectorRef{
|
sector := storiface.SectorRef{
|
||||||
ID: abi.SectorID{
|
ID: abi.SectorID{
|
||||||
Miner: abi.ActorID(task.SpID),
|
Miner: abi.ActorID(task.SpID),
|
||||||
@ -81,10 +86,15 @@ func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
indIDs := make([]int64, len(ids))
|
||||||
|
for i, id := range ids {
|
||||||
|
indIDs[i] = int64(id)
|
||||||
|
}
|
||||||
|
|
||||||
err := f.db.Select(ctx, &tasks, `
|
err := f.db.Select(ctx, &tasks, `
|
||||||
select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p
|
select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p
|
||||||
inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num
|
inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num
|
||||||
where task_id_finalize in ($1) and l.sector_filetype=4`, ids)
|
where task_id_finalize = ANY ($1) and l.sector_filetype=4`, indIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting tasks: %w", err)
|
return nil, xerrors.Errorf("getting tasks: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ func NewMoveStorageTask(sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb.DB, m
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
var task struct {
|
var tasks []struct {
|
||||||
SpID int64 `db:"sp_id"`
|
SpID int64 `db:"sp_id"`
|
||||||
SectorNumber int64 `db:"sector_number"`
|
SectorNumber int64 `db:"sector_number"`
|
||||||
RegSealProof int64 `db:"reg_seal_proof"`
|
RegSealProof int64 `db:"reg_seal_proof"`
|
||||||
@ -40,11 +40,15 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
err = m.db.Select(ctx, &task, `
|
err = m.db.Select(ctx, &tasks, `
|
||||||
select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_move_storage=$1`, taskID)
|
select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_move_storage=$1`, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting task: %w", err)
|
return false, xerrors.Errorf("getting task: %w", err)
|
||||||
}
|
}
|
||||||
|
if len(tasks) != 1 {
|
||||||
|
return false, xerrors.Errorf("expected one task")
|
||||||
|
}
|
||||||
|
task := tasks[0]
|
||||||
|
|
||||||
sector := storiface.SectorRef{
|
sector := storiface.SectorRef{
|
||||||
ID: abi.SectorID{
|
ID: abi.SectorID{
|
||||||
@ -76,10 +80,14 @@ func (m *MoveStorageTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas
|
|||||||
StorageID string `db:"storage_id"`
|
StorageID string `db:"storage_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
indIDs := make([]int64, len(ids))
|
||||||
|
for i, id := range ids {
|
||||||
|
indIDs[i] = int64(id)
|
||||||
|
}
|
||||||
err := m.db.Select(ctx, &tasks, `
|
err := m.db.Select(ctx, &tasks, `
|
||||||
select p.task_id_move_storage, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p
|
select p.task_id_move_storage, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p
|
||||||
inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num
|
inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num
|
||||||
where task_id_move_storage in ($1) and l.sector_filetype=4`, ids)
|
where task_id_move_storage in ($1) and l.sector_filetype=4`, indIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting tasks: %w", err)
|
return nil, xerrors.Errorf("getting tasks: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user