2023-12-18 22:48:49 +00:00
package lpseal
import (
"context"
2024-01-12 10:03:37 +00:00
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
2024-01-05 21:05:50 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
2024-01-12 10:03:37 +00:00
2024-01-05 21:05:50 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
2023-12-18 22:48:49 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/promise"
)
var log = logging . Logger ( "lpseal" )
const (
pollerSDR = iota
2023-12-20 10:28:06 +00:00
pollerTrees
2023-12-25 17:35:25 +00:00
pollerPrecommitMsg
2024-01-11 15:47:07 +00:00
pollerPoRep
2024-01-11 18:24:28 +00:00
pollerCommitMsg
2024-01-30 11:43:57 +00:00
pollerFinalize
2024-01-30 19:05:47 +00:00
pollerMoveStorage
2023-12-18 22:48:49 +00:00
numPollers
)
const sealPollerInterval = 10 * time . Second
2024-01-11 21:39:42 +00:00
const seedEpochConfidence = 3
2023-12-18 22:48:49 +00:00
2024-01-05 21:05:50 +00:00
type SealPollerAPI interface {
StateSectorPreCommitInfo ( context . Context , address . Address , abi . SectorNumber , types . TipSetKey ) ( * miner . SectorPreCommitOnChainInfo , error )
2024-01-11 21:39:42 +00:00
StateSectorGetInfo ( ctx context . Context , maddr address . Address , sectorNumber abi . SectorNumber , tsk types . TipSetKey ) ( * miner . SectorOnChainInfo , error )
ChainHead ( context . Context ) ( * types . TipSet , error )
2024-01-05 21:05:50 +00:00
}
2023-12-18 22:48:49 +00:00
type SealPoller struct {
2024-01-05 21:05:50 +00:00
db * harmonydb . DB
api SealPollerAPI
2023-12-18 22:48:49 +00:00
pollers [ numPollers ] promise . Promise [ harmonytask . AddTaskFunc ]
}
2024-01-05 21:05:50 +00:00
func NewPoller ( db * harmonydb . DB , api SealPollerAPI ) * SealPoller {
2023-12-19 11:16:38 +00:00
return & SealPoller {
2024-01-05 21:05:50 +00:00
db : db ,
api : api ,
2023-12-19 11:16:38 +00:00
}
}
func ( s * SealPoller ) RunPoller ( ctx context . Context ) {
2023-12-18 22:48:49 +00:00
ticker := time . NewTicker ( sealPollerInterval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
2023-12-19 11:16:38 +00:00
return
2023-12-18 22:48:49 +00:00
case <- ticker . C :
2023-12-19 11:16:38 +00:00
if err := s . poll ( ctx ) ; err != nil {
2023-12-19 11:39:25 +00:00
log . Errorw ( "polling failed" , "error" , err )
2023-12-19 11:16:38 +00:00
}
2023-12-18 22:48:49 +00:00
}
}
}
2024-01-17 11:42:43 +00:00
type pollTask struct {
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskSDR * int64 ` db:"task_id_sdr" `
AfterSDR bool ` db:"after_sdr" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskTreeD * int64 ` db:"task_id_tree_d" `
AfterTreeD bool ` db:"after_tree_d" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskTreeC * int64 ` db:"task_id_tree_c" `
AfterTreeC bool ` db:"after_tree_c" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskTreeR * int64 ` db:"task_id_tree_r" `
AfterTreeR bool ` db:"after_tree_r" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskPrecommitMsg * int64 ` db:"task_id_precommit_msg" `
AfterPrecommitMsg bool ` db:"after_precommit_msg" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
AfterPrecommitMsgSuccess bool ` db:"after_precommit_msg_success" `
SeedEpoch * int64 ` db:"seed_epoch" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
TaskPoRep * int64 ` db:"task_id_porep" `
PoRepProof [ ] byte ` db:"porep_proof" `
AfterPoRep bool ` db:"after_porep" `
2023-12-18 22:48:49 +00:00
2024-01-30 11:43:57 +00:00
TaskFinalize * int64 ` db:"task_id_finalize" `
AfterFinalize bool ` db:"after_finalize" `
2024-01-30 19:05:47 +00:00
TaskMoveStorage * int64 ` db:"task_id_move_storage" `
AfterMoveStorage bool ` db:"after_move_storage" `
2024-01-17 11:42:43 +00:00
TaskCommitMsg * int64 ` db:"task_id_commit_msg" `
AfterCommitMsg bool ` db:"after_commit_msg" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
AfterCommitMsgSuccess bool ` db:"after_commit_msg_success" `
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
Failed bool ` db:"failed" `
FailedReason string ` db:"failed_reason" `
}
func ( s * SealPoller ) poll ( ctx context . Context ) error {
var tasks [ ] pollTask
2023-12-18 22:48:49 +00:00
2023-12-19 11:39:25 +00:00
err := s . db . Select ( ctx , & tasks , ` SELECT
sp_id , sector_number ,
task_id_sdr , after_sdr ,
task_id_tree_d , after_tree_d ,
task_id_tree_c , after_tree_c ,
task_id_tree_r , after_tree_r ,
task_id_precommit_msg , after_precommit_msg ,
2024-01-11 21:39:42 +00:00
after_precommit_msg_success , seed_epoch ,
task_id_porep , porep_proof , after_porep ,
2024-01-30 11:43:57 +00:00
task_id_finalize , after_finalize ,
2024-01-30 19:05:47 +00:00
task_id_move_storage , after_move_storage ,
2023-12-19 11:39:25 +00:00
task_id_commit_msg , after_commit_msg ,
2024-01-11 21:39:42 +00:00
after_commit_msg_success ,
2023-12-19 11:39:25 +00:00
failed , failed_reason
2024-01-30 11:43:57 +00:00
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true or after_finalize != true ` )
2023-12-18 22:48:49 +00:00
if err != nil {
return err
}
for _ , task := range tasks {
2024-01-16 18:27:55 +00:00
task := task
2023-12-18 22:48:49 +00:00
if task . Failed {
continue
}
2024-01-17 11:42:43 +00:00
ts , err := s . api . ChainHead ( ctx )
if err != nil {
return xerrors . Errorf ( "getting chain head: %w" , err )
2023-12-18 22:48:49 +00:00
}
2024-01-17 11:42:43 +00:00
s . pollStartSDR ( ctx , task )
s . pollStartSDRTrees ( ctx , task )
s . pollStartPrecommitMsg ( ctx , task )
s . mustPoll ( s . pollPrecommitMsgLanded ( ctx , task ) )
s . pollStartPoRep ( ctx , task , ts )
2024-01-30 11:43:57 +00:00
s . pollStartFinalize ( ctx , task , ts )
2024-01-17 11:42:43 +00:00
s . pollStartCommitMsg ( ctx , task )
s . mustPoll ( s . pollCommitMsgLanded ( ctx , task ) )
}
2023-12-18 22:48:49 +00:00
2024-01-17 11:42:43 +00:00
return nil
}
2024-01-05 21:05:50 +00:00
2024-01-17 11:42:43 +00:00
func ( s * SealPoller ) pollStartSDR ( ctx context . Context , task pollTask ) {
if task . TaskSDR == nil && s . pollers [ pollerSDR ] . IsSet ( ) {
s . pollers [ pollerSDR ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
2024-01-05 21:05:50 +00:00
}
2024-01-17 11:42:43 +00:00
return true , nil
} )
}
}
func ( s * SealPoller ) pollStartSDRTrees ( ctx context . Context , task pollTask ) {
if task . TaskTreeD == nil && task . TaskTreeC == nil && task . TaskTreeR == nil && s . pollers [ pollerTrees ] . IsSet ( ) && task . AfterSDR {
s . pollers [ pollerTrees ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_tree_d = $ 1 , task_id_tree_c = $ 1 , task_id_tree_r = $ 1
WHERE sp_id = $ 2 AND sector_number = $ 3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null ` , id , task . SpID , task . SectorNumber )
2024-01-05 21:05:50 +00:00
if err != nil {
2024-01-17 11:42:43 +00:00
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
2024-01-05 21:05:50 +00:00
}
2024-01-17 11:42:43 +00:00
return true , nil
} )
}
}
2024-01-05 21:05:50 +00:00
2024-01-17 11:42:43 +00:00
func ( s * SealPoller ) pollStartPoRep ( ctx context . Context , task pollTask , ts * types . TipSet ) {
if s . pollers [ pollerPoRep ] . IsSet ( ) && task . AfterPrecommitMsgSuccess && task . SeedEpoch != nil && task . TaskPoRep == nil && ts . Height ( ) >= abi . ChainEpoch ( * task . SeedEpoch + seedEpochConfidence ) {
s . pollers [ pollerPoRep ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
2024-01-11 21:39:42 +00:00
}
2024-01-17 11:42:43 +00:00
return true , nil
} )
}
}
2024-01-30 11:43:57 +00:00
func ( s * SealPoller ) pollStartFinalize ( ctx context . Context , task pollTask , ts * types . TipSet ) {
if s . pollers [ pollerFinalize ] . IsSet ( ) && task . AfterPoRep && task . TaskFinalize == nil {
s . pollers [ pollerFinalize ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
}
}
2024-01-30 19:05:47 +00:00
func ( s * SealPoller ) pollStartMoveStorage ( ctx context . Context , task pollTask ) {
if s . pollers [ pollerMoveStorage ] . IsSet ( ) && task . AfterFinalize && task . TaskMoveStorage == nil {
s . pollers [ pollerMoveStorage ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
}
}
2024-01-17 11:42:43 +00:00
func ( s * SealPoller ) mustPoll ( err error ) {
if err != nil {
log . Errorw ( "poller operation failed" , "error" , err )
}
}