2023-12-18 22:48:49 +00:00
package lpseal
import (
"context"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/promise"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"time"
)
var log = logging . Logger ( "lpseal" )
const (
pollerSDR = iota
2023-12-20 10:28:06 +00:00
pollerTrees
2023-12-18 22:48:49 +00:00
numPollers
)
const sealPollerInterval = 10 * time . Second
type SealPoller struct {
db * harmonydb . DB
pollers [ numPollers ] promise . Promise [ harmonytask . AddTaskFunc ]
}
2023-12-19 11:16:38 +00:00
func NewPoller ( db * harmonydb . DB ) * SealPoller {
return & SealPoller {
db : db ,
}
}
func ( s * SealPoller ) RunPoller ( ctx context . Context ) {
2023-12-18 22:48:49 +00:00
ticker := time . NewTicker ( sealPollerInterval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
2023-12-19 11:16:38 +00:00
return
2023-12-18 22:48:49 +00:00
case <- ticker . C :
2023-12-19 11:16:38 +00:00
if err := s . poll ( ctx ) ; err != nil {
2023-12-19 11:39:25 +00:00
log . Errorw ( "polling failed" , "error" , err )
2023-12-19 11:16:38 +00:00
}
2023-12-18 22:48:49 +00:00
}
}
}
func ( s * SealPoller ) poll ( ctx context . Context ) error {
var tasks [ ] struct {
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
TaskSDR * int64 ` db:"task_id_sdr" `
AfterSDR bool ` db:"after_sdr" `
TaskTreeD * int64 ` db:"task_id_tree_d" `
AfterTreeD bool ` db:"after_tree_d" `
TaskTreeC * int64 ` db:"task_id_tree_c" `
AfterTreeC bool ` db:"after_tree_c" `
TaskTreeR * int64 ` db:"task_id_tree_r" `
AfterTreeR bool ` db:"after_tree_r" `
TaskPrecommitMsg * int64 ` db:"task_id_precommit_msg" `
AfterPrecommitMsg bool ` db:"after_precommit_msg" `
TaskPrecommitMsgWait * int64 ` db:"task_id_precommit_msg_wait" `
AfterPrecommitMsgSuccess bool ` db:"after_precommit_msg_success" `
TaskPoRep * int64 ` db:"task_id_porep" `
PoRepProof [ ] byte ` db:"porep_proof" `
TaskCommitMsg * int64 ` db:"task_id_commit_msg" `
AfterCommitMsg bool ` db:"after_commit_msg" `
TaskCommitMsgWait * int64 ` db:"task_id_commit_msg_wait" `
AfterCommitMsgSuccess bool ` db:"after_commit_msg_success" `
Failed bool ` db:"failed" `
FailedReason string ` db:"failed_reason" `
}
2023-12-19 11:39:25 +00:00
err := s . db . Select ( ctx , & tasks , ` SELECT
sp_id , sector_number ,
task_id_sdr , after_sdr ,
task_id_tree_d , after_tree_d ,
task_id_tree_c , after_tree_c ,
task_id_tree_r , after_tree_r ,
task_id_precommit_msg , after_precommit_msg ,
task_id_precommit_msg_wait , after_precommit_msg_success ,
task_id_porep , porep_proof ,
task_id_commit_msg , after_commit_msg ,
task_id_commit_msg_wait , after_commit_msg_success ,
failed , failed_reason
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true ` )
2023-12-18 22:48:49 +00:00
if err != nil {
return err
}
for _ , task := range tasks {
if task . Failed {
continue
}
2023-12-19 11:16:38 +00:00
if task . TaskSDR == nil && s . pollers [ pollerSDR ] . IsSet ( ) {
2023-12-18 22:48:49 +00:00
s . pollers [ pollerSDR ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
2023-12-20 13:45:19 +00:00
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null ` , id , task . SpID , task . SectorNumber )
2023-12-18 22:48:49 +00:00
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
}
2023-12-20 13:45:19 +00:00
if task . TaskTreeD == nil && task . TaskTreeC == nil && task . TaskTreeR == nil && s . pollers [ pollerTrees ] . IsSet ( ) && task . AfterSDR {
s . pollers [ pollerTrees ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_tree_d = $ 1 , task_id_tree_c = $ 1 , task_id_tree_r = $ 1
WHERE sp_id = $ 2 AND sector_number = $ 3 and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
2023-12-18 22:48:49 +00:00
2023-12-20 13:45:19 +00:00
return true , nil
} )
2023-12-18 22:48:49 +00:00
}
if task . TaskPrecommitMsg == nil && task . AfterTreeR && task . AfterTreeD {
// todo start precommit msg task
}
if task . TaskPrecommitMsgWait == nil && task . AfterPrecommitMsg {
// todo start precommit msg wait task
}
todoWaitSeed := false
if task . TaskPoRep == nil && todoWaitSeed {
// todo start porep task
}
}
return nil
}