2024-01-30 11:43:57 +00:00
package lpseal
import (
"context"
2024-01-30 22:43:27 +00:00
"golang.org/x/xerrors"
2024-01-30 11:43:57 +00:00
"github.com/filecoin-project/go-state-types/abi"
2024-01-30 22:43:27 +00:00
2024-01-30 11:43:57 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type FinalizeTask struct {
max int
sp * SealPoller
sc * lpffi . SealCalls
db * harmonydb . DB
}
func NewFinalizeTask ( max int , sp * SealPoller , sc * lpffi . SealCalls , db * harmonydb . DB ) * FinalizeTask {
return & FinalizeTask {
max : max ,
sp : sp ,
sc : sc ,
db : db ,
}
}
func ( f * FinalizeTask ) Do ( taskID harmonytask . TaskID , stillOwned func ( ) bool ) ( done bool , err error ) {
2024-02-08 21:29:43 +00:00
var tasks [ ] struct {
2024-01-30 11:43:57 +00:00
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
RegSealProof int64 ` db:"reg_seal_proof" `
}
ctx := context . Background ( )
2024-02-08 21:29:43 +00:00
err = f . db . Select ( ctx , & tasks , `
2024-01-30 11:43:57 +00:00
select sp_id , sector_number , reg_seal_proof from sectors_sdr_pipeline where task_id_finalize = $ 1 ` , taskID )
if err != nil {
return false , xerrors . Errorf ( "getting task: %w" , err )
}
2024-02-08 21:29:43 +00:00
if len ( tasks ) != 1 {
return false , xerrors . Errorf ( "expected one task" )
}
task := tasks [ 0 ]
2024-02-10 18:14:31 +00:00
var keepUnsealed bool
2024-02-11 21:47:08 +00:00
if err := f . db . QueryRow ( ctx , ` select coalesce(bool_or(not data_delete_on_finalize), false) from sectors_sdr_initial_pieces where sp_id=$1 and sector_number=$2 ` , task . SpID , task . SectorNumber ) . Scan ( & keepUnsealed ) ; err != nil {
2024-02-10 18:14:31 +00:00
return false , err
}
2024-01-30 11:43:57 +00:00
sector := storiface . SectorRef {
ID : abi . SectorID {
Miner : abi . ActorID ( task . SpID ) ,
Number : abi . SectorNumber ( task . SectorNumber ) ,
} ,
ProofType : abi . RegisteredSealProof ( task . RegSealProof ) ,
}
2024-02-10 18:14:31 +00:00
err = f . sc . FinalizeSector ( ctx , sector , keepUnsealed )
2024-01-30 11:43:57 +00:00
if err != nil {
return false , xerrors . Errorf ( "finalizing sector: %w" , err )
}
// set after_finalize
_ , err = f . db . Exec ( ctx , ` update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1 ` , taskID )
if err != nil {
return false , xerrors . Errorf ( "updating task: %w" , err )
}
return true , nil
}
func ( f * FinalizeTask ) CanAccept ( ids [ ] harmonytask . TaskID , engine * harmonytask . TaskEngine ) ( * harmonytask . TaskID , error ) {
var tasks [ ] struct {
TaskID harmonytask . TaskID ` db:"task_id_finalize" `
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
StorageID string ` db:"storage_id" `
}
if 4 != storiface . FTCache {
panic ( "storiface.FTCache != 4" )
}
ctx := context . Background ( )
2024-02-08 21:29:43 +00:00
indIDs := make ( [ ] int64 , len ( ids ) )
for i , id := range ids {
indIDs [ i ] = int64 ( id )
}
2024-01-30 11:43:57 +00:00
err := f . db . Select ( ctx , & tasks , `
select p . task_id_finalize , p . sp_id , p . sector_number , l . storage_id from sectors_sdr_pipeline p
inner join sector_location l on p . sp_id = l . miner_id and p . sector_number = l . sector_num
2024-02-08 21:29:43 +00:00
where task_id_finalize = ANY ( $ 1 ) and l . sector_filetype = 4 ` , indIDs )
2024-01-30 11:43:57 +00:00
if err != nil {
return nil , xerrors . Errorf ( "getting tasks: %w" , err )
}
ls , err := f . sc . LocalStorage ( ctx )
if err != nil {
return nil , xerrors . Errorf ( "getting local storage: %w" , err )
}
2024-01-30 19:05:47 +00:00
acceptables := map [ harmonytask . TaskID ] bool { }
for _ , t := range ids {
acceptables [ t ] = true
}
2024-01-30 11:43:57 +00:00
for _ , t := range tasks {
2024-01-30 19:05:47 +00:00
if _ , ok := acceptables [ t . TaskID ] ; ! ok {
continue
}
2024-01-30 11:43:57 +00:00
for _ , l := range ls {
if string ( l . ID ) == t . StorageID {
return & t . TaskID , nil
}
}
}
return nil , nil
}
func ( f * FinalizeTask ) TypeDetails ( ) harmonytask . TaskTypeDetails {
return harmonytask . TaskTypeDetails {
Max : f . max ,
Name : "Finalize" ,
Cost : resources . Resources {
Cpu : 1 ,
Gpu : 0 ,
Ram : 100 << 20 ,
} ,
MaxFailures : 10 ,
}
}
func ( f * FinalizeTask ) Adder ( taskFunc harmonytask . AddTaskFunc ) {
f . sp . pollers [ pollerFinalize ] . Set ( taskFunc )
}
var _ harmonytask . TaskInterface = & FinalizeTask { }