2024-01-19 13:08:05 +00:00
package lpseal
import (
"context"
2024-01-20 14:52:38 +00:00
"golang.org/x/xerrors"
2024-01-19 13:08:05 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
2024-01-20 14:52:38 +00:00
2024-01-19 13:08:05 +00:00
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
)
func ( s * SealPoller ) pollStartPrecommitMsg ( ctx context . Context , task pollTask ) {
2024-02-08 21:06:57 +00:00
if task . TaskPrecommitMsg == nil && ! task . AfterPrecommitMsg && task . AfterTreeR && task . AfterTreeD {
2024-01-19 13:08:05 +00:00
s . pollers [ pollerPrecommitMsg ] . Val ( ctx ) ( func ( id harmonytask . TaskID , tx * harmonydb . Tx ) ( shouldCommit bool , seriousError error ) {
n , err := tx . Exec ( ` UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true ` , id , task . SpID , task . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "expected to update 1 row, updated %d" , n )
}
return true , nil
} )
}
}
type dbExecResult struct {
PrecommitMsgCID * string ` db:"precommit_msg_cid" `
CommitMsgCID * string ` db:"commit_msg_cid" `
ExecutedTskCID string ` db:"executed_tsk_cid" `
ExecutedTskEpoch int64 ` db:"executed_tsk_epoch" `
ExecutedMsgCID string ` db:"executed_msg_cid" `
ExecutedRcptExitCode int64 ` db:"executed_rcpt_exitcode" `
ExecutedRcptGasUsed int64 ` db:"executed_rcpt_gas_used" `
}
func ( s * SealPoller ) pollPrecommitMsgLanded ( ctx context . Context , task pollTask ) error {
2024-02-08 21:06:57 +00:00
if task . AfterPrecommitMsg && ! task . AfterPrecommitMsgSuccess {
2024-01-19 13:08:05 +00:00
var execResult [ ] dbExecResult
err := s . db . Select ( ctx , & execResult , ` SELECT spipeline . precommit_msg_cid , spipeline . commit_msg_cid , executed_tsk_cid , executed_tsk_epoch , executed_msg_cid , executed_rcpt_exitcode , executed_rcpt_gas_used
FROM sectors_sdr_pipeline spipeline
JOIN message_waits ON spipeline . precommit_msg_cid = message_waits . signed_message_cid
WHERE sp_id = $ 1 AND sector_number = $ 2 AND executed_tsk_epoch is not null ` , task . SpID , task . SectorNumber )
if err != nil {
log . Errorw ( "failed to query message_waits" , "error" , err )
}
if len ( execResult ) > 0 {
if exitcode . ExitCode ( execResult [ 0 ] . ExecutedRcptExitCode ) != exitcode . Ok {
return s . pollPrecommitMsgFail ( ctx , task , execResult [ 0 ] )
}
maddr , err := address . NewIDAddress ( uint64 ( task . SpID ) )
if err != nil {
return err
}
pci , err := s . api . StateSectorPreCommitInfo ( ctx , maddr , abi . SectorNumber ( task . SectorNumber ) , types . EmptyTSK )
if err != nil {
return xerrors . Errorf ( "get precommit info: %w" , err )
}
if pci != nil {
randHeight := pci . PreCommitEpoch + policy . GetPreCommitChallengeDelay ( )
_ , err := s . db . Exec ( ctx , ` UPDATE sectors_sdr_pipeline SET
seed_epoch = $ 1 , precommit_msg_tsk = $ 2 , after_precommit_msg_success = true
WHERE sp_id = $ 3 AND sector_number = $ 4 and seed_epoch is NULL ` ,
randHeight , execResult [ 0 ] . ExecutedTskCID , task . SpID , task . SectorNumber )
if err != nil {
return xerrors . Errorf ( "update sectors_sdr_pipeline: %w" , err )
}
} // todo handle missing precommit info (eg expired precommit)
}
}
return nil
}
func ( s * SealPoller ) pollPrecommitMsgFail ( ctx context . Context , task pollTask , execResult dbExecResult ) error {
switch exitcode . ExitCode ( execResult . ExecutedRcptExitCode ) {
case exitcode . SysErrInsufficientFunds :
fallthrough
case exitcode . SysErrOutOfGas :
// just retry
return s . pollRetryPrecommitMsgSend ( ctx , task , execResult )
default :
return xerrors . Errorf ( "precommit message failed with exit code %s" , exitcode . ExitCode ( execResult . ExecutedRcptExitCode ) )
}
}
func ( s * SealPoller ) pollRetryPrecommitMsgSend ( ctx context . Context , task pollTask , execResult dbExecResult ) error {
if execResult . PrecommitMsgCID == nil {
return xerrors . Errorf ( "precommit msg cid was nil" )
}
// make the pipeline entry seem like precommit send didn't happen, next poll loop will retry
_ , err := s . db . Exec ( ctx , ` UPDATE sectors_sdr_pipeline SET
precommit_msg_cid = null , task_id_precommit_msg = null
WHERE precommit_msg_cid = $ 1 AND sp_id = $ 2 AND sector_number = $ 3 AND after_precommit_msg_success = false ` ,
* execResult . PrecommitMsgCID , task . SpID , task . SectorNumber )
if err != nil {
return xerrors . Errorf ( "update sectors_sdr_pipeline to retry precommit msg send: %w" , err )
}
return nil
}