2023-12-18 22:48:49 +00:00
package lpseal
import (
"context"
2024-01-05 21:05:50 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
2023-12-18 22:48:49 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/promise"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"time"
)
var log = logging . Logger ( "lpseal" )
const (
pollerSDR = iota
2023-12-20 10:28:06 +00:00
pollerTrees
2023-12-25 17:35:25 +00:00
pollerPrecommitMsg
2023-12-18 22:48:49 +00:00
numPollers
)
const sealPollerInterval = 10 * time . Second
2024-01-05 21:05:50 +00:00
type SealPollerAPI interface {
StateSectorPreCommitInfo ( context . Context , address . Address , abi . SectorNumber , types . TipSetKey ) ( * miner . SectorPreCommitOnChainInfo , error )
}
2023-12-18 22:48:49 +00:00
type SealPoller struct {
2024-01-05 21:05:50 +00:00
db * harmonydb . DB
api SealPollerAPI
2023-12-18 22:48:49 +00:00
pollers [ numPollers ] promise . Promise [ harmonytask . AddTaskFunc ]
}
2024-01-05 21:05:50 +00:00
func NewPoller ( db * harmonydb . DB , api SealPollerAPI ) * SealPoller {
2023-12-19 11:16:38 +00:00
return & SealPoller {
2024-01-05 21:05:50 +00:00
db : db ,
api : api ,
2023-12-19 11:16:38 +00:00
}
}
func ( s * SealPoller ) RunPoller ( ctx context . Context ) {
2023-12-18 22:48:49 +00:00
ticker := time . NewTicker ( sealPollerInterval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
2023-12-19 11:16:38 +00:00
return
2023-12-18 22:48:49 +00:00
case <- ticker . C :
2023-12-19 11:16:38 +00:00
if err := s . poll ( ctx ) ; err != nil {
2023-12-19 11:39:25 +00:00
log . Errorw ( "polling failed" , "error" , err )
2023-12-19 11:16:38 +00:00
}
2023-12-18 22:48:49 +00:00
}
}
}
func ( s * SealPoller ) poll ( ctx context . Context ) error {
var tasks [ ] struct {
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
TaskSDR * int64 ` db:"task_id_sdr" `
AfterSDR bool ` db:"after_sdr" `
TaskTreeD * int64 ` db:"task_id_tree_d" `
AfterTreeD bool ` db:"after_tree_d" `
TaskTreeC * int64 ` db:"task_id_tree_c" `
AfterTreeC bool ` db:"after_tree_c" `
TaskTreeR * int64 ` db:"task_id_tree_r" `
AfterTreeR bool ` db:"after_tree_r" `
TaskPrecommitMsg * int64 ` db:"task_id_precommit_msg" `
AfterPrecommitMsg bool ` db:"after_precommit_msg" `
2024-01-05 15:10:34 +00:00
AfterPrecommitMsgSuccess bool ` db:"after_precommit_msg_success" `
2023-12-18 22:48:49 +00:00
TaskPoRep * int64 ` db:"task_id_porep" `
PoRepProof [ ] byte ` db:"porep_proof" `
TaskCommitMsg * int64 ` db:"task_id_commit_msg" `
AfterCommitMsg bool ` db:"after_commit_msg" `
TaskCommitMsgWait * int64 ` db:"task_id_commit_msg_wait" `
AfterCommitMsgSuccess bool ` db:"after_commit_msg_success" `
Failed bool ` db:"failed" `
FailedReason string ` db:"failed_reason" `
}
2023-12-19 11:39:25 +00:00
err := s . db . Select ( ctx , & tasks , ` SELECT
sp_id , sector_number ,
task_id_sdr , after_sdr ,
task_id_tree_d , after_tree_d ,
task_id_tree_c , after_tree_c ,
task_id_tree_r , after_tree_r ,
task_id_precommit_msg , after_precommit_msg ,
2024-01-05 15:10:34 +00:00
after_precommit_msg_success ,
2023-12-19 11:39:25 +00:00
task_id_porep , porep_proof ,
task_id_commit_msg , after_commit_msg ,
task_id_commit_msg_wait , after_commit_msg_success ,
failed , failed_reason
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true ` )
2023-12-18 22:48:49 +00:00
if err != nil {
return err
}
for _ , task := range tasks {
if task . Failed {
continue
}
2023-12-19 11:16:38 +00:00
if task . TaskSDR == nil && s . pollers [ pollerSDR ] . IsSet ( ) {
2023-12-18 22:48:49 +00:00
s . pollers [ pollerSDR ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
2023-12-20 13:45:19 +00:00
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null ` , id , task . SpID , task . SectorNumber )
2023-12-18 22:48:49 +00:00
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
}
2023-12-20 13:45:19 +00:00
if task . TaskTreeD == nil && task . TaskTreeC == nil && task . TaskTreeR == nil && s . pollers [ pollerTrees ] . IsSet ( ) && task . AfterSDR {
s . pollers [ pollerTrees ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_tree_d = $ 1 , task_id_tree_c = $ 1 , task_id_tree_r = $ 1
2023-12-25 17:35:25 +00:00
WHERE sp_id = $ 2 AND sector_number = $ 3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null ` , id , task . SpID , task . SectorNumber )
2023-12-20 13:45:19 +00:00
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
2023-12-18 22:48:49 +00:00
2023-12-20 13:45:19 +00:00
return true , nil
} )
2023-12-18 22:48:49 +00:00
}
if task . TaskPrecommitMsg == nil && task . AfterTreeR && task . AfterTreeD {
2023-12-25 17:35:25 +00:00
s . pollers [ pollerPrecommitMsg ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
2023-12-18 22:48:49 +00:00
}
2024-01-05 21:05:50 +00:00
if task . TaskPrecommitMsg != nil && ! task . AfterPrecommitMsgSuccess {
// join pipeline precommit_msg_cid with message_waits signed_message_cid, if executed_tsk_epoch is not null, return result
2024-01-05 15:10:34 +00:00
2024-01-05 21:05:50 +00:00
var execResult [ ] struct {
ExecutedTskCID string ` db:"executed_tsk_cid" `
ExecutedTskEpoch int64 ` db:"executed_tsk_epoch" `
ExecutedMsgCID string ` db:"executed_msg_cid" `
ExecutedRcptExitCode int64 ` db:"executed_rcpt_exitcode" `
ExecutedRcptGasUsed int64 ` db:"executed_rcpt_gas_used" `
}
err := s . db . Select ( ctx , & execResult , ` SELECT executed_tsk_cid , executed_tsk_epoch , executed_msg_cid , executed_rcpt_exitcode , executed_rcpt_gas_used
FROM sectors_sdr_pipeline
JOIN message_waits ON sectors_sdr_pipeline . precommit_msg_cid = message_waits . signed_message_cid
WHERE sp_id = $ 1 AND sector_number = $ 2 AND executed_tsk_epoch is not null ` , task . SpID , task . SectorNumber )
if err != nil {
log . Errorw ( "failed to query message_waits" , "error" , err )
}
if len ( execResult ) > 0 {
maddr , err := address . NewIDAddress ( uint64 ( task . SpID ) )
if err != nil {
return err
}
pci , err := s . api . StateSectorPreCommitInfo ( ctx , maddr , abi . SectorNumber ( task . SectorNumber ) , types . EmptyTSK )
if err != nil {
return xerrors . Errorf ( "get precommit info: %w" , err )
}
if pci != nil {
randHeight := pci . PreCommitEpoch + policy . GetPreCommitChallengeDelay ( )
2024-01-05 15:10:34 +00:00
2024-01-05 21:05:50 +00:00
_ , err := s . db . Exec ( ctx , ` UPDATE sectors_sdr_pipeline SET
seed_epoch = $ 1 , precommit_msg_tsk = $ 2 , after_precommit_msg_success = true
WHERE sp_id = $ 3 AND sector_number = $ 4 and seed_epoch is NULL ` ,
randHeight , execResult [ 0 ] . ExecutedTskCID , task . SpID , task . SectorNumber )
if err != nil {
return xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
} // todo handle missing precommit info (eg expired precommit)
}
2023-12-18 22:48:49 +00:00
}
todoWaitSeed := false
if task . TaskPoRep == nil && todoWaitSeed {
// todo start porep task
}
}
return nil
}