2023-12-18 17:29:10 +00:00
package lpseal
import (
2023-12-18 22:48:49 +00:00
"bytes"
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
2023-12-18 17:29:10 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
2023-12-18 22:48:49 +00:00
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"golang.org/x/xerrors"
2023-12-18 17:29:10 +00:00
)
type SDRAPI interface {
2023-12-18 22:48:49 +00:00
ChainHead ( context . Context ) ( * types . TipSet , error )
StateGetRandomnessFromTickets ( context . Context , crypto . DomainSeparationTag , abi . ChainEpoch , [ ] byte , types . TipSetKey ) ( abi . Randomness , error )
2023-12-18 17:29:10 +00:00
}
type SDRTask struct {
api SDRAPI
db * harmonydb . DB
2023-12-18 22:48:49 +00:00
sp * SealPoller
sc * lpffi . SealCalls
2023-12-18 17:29:10 +00:00
maxSDR int
}
func ( s * SDRTask ) Do ( taskID harmonytask . TaskID , stillOwned func ( ) bool ) ( done bool , err error ) {
2023-12-18 22:48:49 +00:00
ctx := context . Background ( )
var sectorParams struct {
SpID int64 ` db:"sp_id" `
SectorNumber int64 ` db:"sector_number" `
RegSealProof abi . RegisteredSealProof ` db:"reg_seal_proof" `
}
err = s . db . Select ( ctx , & sectorParams , `
SELECT sp_id , sector_number , reg_seal_proof
FROM sectors_sdr_pipeline ` )
if err != nil {
return false , xerrors . Errorf ( "getting sector params: %w" , err )
}
var pieces [ ] struct {
PieceIndex int64 ` db:"piece_index" `
PieceCID string ` db:"piece_cid" `
PieceSize int64 ` db:"piece_size" `
}
err = s . db . Select ( ctx , & pieces , `
SELECT piece_index , piece_cid , piece_size
FROM sectors_sdr_initial_pieces
WHERE sp_id = $ 1 AND sector_number = $ 2 ` , sectorParams . SpID , sectorParams . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "getting pieces: %w" , err )
}
if len ( pieces ) > 0 {
// todo sdr with data
return false , xerrors . Errorf ( "todo sdr with data" )
}
ssize , err := sectorParams . RegSealProof . SectorSize ( )
if err != nil {
return false , xerrors . Errorf ( "getting sector size: %w" , err )
}
commd := zerocomm . ZeroPieceCommitment ( abi . PaddedPieceSize ( ssize ) . Unpadded ( ) )
sref := storiface . SectorRef {
ID : abi . SectorID {
Miner : abi . ActorID ( sectorParams . SpID ) ,
Number : abi . SectorNumber ( sectorParams . SectorNumber ) ,
} ,
ProofType : sectorParams . RegSealProof ,
}
// get ticket
maddr , err := address . NewIDAddress ( uint64 ( sectorParams . SpID ) )
if err != nil {
return false , xerrors . Errorf ( "getting miner address: %w" , err )
}
ticket , err := s . getTicket ( ctx , maddr )
if err != nil {
return false , xerrors . Errorf ( "getting ticket: %w" , err )
}
// do the SDR!!
err = s . sc . GenerateSDR ( ctx , sref , ticket , commd )
if err != nil {
return false , xerrors . Errorf ( "generating sdr: %w" , err )
}
// store success!
n , err := s . db . Exec ( ctx , ` UPDATE sectors_sdr_pipeline SET after_sdr = true WHERE sp_id = $1 AND sector_number = $2 ` , sectorParams . SpID , sectorParams . SectorNumber )
if err != nil {
return false , xerrors . Errorf ( "store sdr success: updating pipeline: %w" , err )
}
if n != 1 {
return false , xerrors . Errorf ( "store sdr success: updated %d rows" , n )
}
return true , nil
}
func ( s * SDRTask ) getTicket ( ctx context . Context , maddr address . Address ) ( abi . SealRandomness , error ) {
ts , err := s . api . ChainHead ( ctx )
if err != nil {
return nil , xerrors . Errorf ( "getting chain head: %w" , err )
}
ticketEpoch := ts . Height ( ) - policy . SealRandomnessLookback
buf := new ( bytes . Buffer )
if err := maddr . MarshalCBOR ( buf ) ; err != nil {
return nil , xerrors . Errorf ( "marshaling miner address: %w" , err )
}
rand , err := s . api . StateGetRandomnessFromTickets ( ctx , crypto . DomainSeparationTag_SealRandomness , ticketEpoch , buf . Bytes ( ) , ts . Key ( ) )
if err != nil {
return nil , xerrors . Errorf ( "getting randomness from tickets: %w" , err )
}
return abi . SealRandomness ( rand ) , nil
2023-12-18 17:29:10 +00:00
}
func ( s * SDRTask ) CanAccept ( ids [ ] harmonytask . TaskID , engine * harmonytask . TaskEngine ) ( * harmonytask . TaskID , error ) {
2023-12-18 22:48:49 +00:00
// todo check storage (reserve too?)
id := ids [ 0 ]
return & id , nil
2023-12-18 17:29:10 +00:00
}
func ( s * SDRTask ) TypeDetails ( ) harmonytask . TaskTypeDetails {
return harmonytask . TaskTypeDetails {
2023-12-18 22:48:49 +00:00
Max : s . maxSDR ,
Name : "SDR" ,
Cost : resources . Resources { // todo offset for prefetch?
Cpu : 4 , // todo multicore sdr
Gpu : 0 ,
Ram : 54 << 30 , // todo measure; lower on 2k devnetn
} ,
2023-12-18 17:29:10 +00:00
MaxFailures : 0 ,
Follows : nil ,
}
}
func ( s * SDRTask ) Adder ( taskFunc harmonytask . AddTaskFunc ) {
2023-12-18 22:48:49 +00:00
s . sp . pollers [ pollerSDR ] . Set ( taskFunc )
2023-12-18 17:29:10 +00:00
}
var _ harmonytask . TaskInterface = & SDRTask { }