2021-03-10 15:16:44 +00:00
package sealing
import (
"bytes"
"context"
"sort"
"sync"
"time"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
2022-09-06 15:49:29 +00:00
actorstypes "github.com/filecoin-project/go-state-types/actors"
2021-03-10 15:16:44 +00:00
"github.com/filecoin-project/go-state-types/big"
2022-04-20 21:34:28 +00:00
"github.com/filecoin-project/go-state-types/builtin"
2021-07-12 16:46:05 +00:00
"github.com/filecoin-project/go-state-types/network"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/go-state-types/proof"
2021-03-10 15:16:44 +00:00
"github.com/filecoin-project/lotus/api"
2021-05-18 11:30:47 +00:00
"github.com/filecoin-project/lotus/build"
2023-08-29 13:16:05 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2021-05-18 17:47:30 +00:00
"github.com/filecoin-project/lotus/chain/actors/policy"
2024-01-25 14:15:55 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2021-07-12 16:46:05 +00:00
"github.com/filecoin-project/lotus/chain/types"
2021-06-08 13:43:43 +00:00
"github.com/filecoin-project/lotus/node/config"
2022-08-09 11:30:34 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2022-06-15 10:06:22 +00:00
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
2022-06-17 11:52:19 +00:00
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2021-03-10 15:16:44 +00:00
)
2021-06-29 06:36:26 +00:00
var aggFeeNum = big . NewInt ( 110 )
var aggFeeDen = big . NewInt ( 100 )
2021-06-09 15:18:09 +00:00
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
2021-03-10 15:16:44 +00:00
type CommitBatcherApi interface {
2022-06-16 13:50:41 +00:00
MpoolPushMessage ( context . Context , * types . Message , * api . MessageSendSpec ) ( * types . SignedMessage , error )
2023-04-12 04:30:19 +00:00
GasEstimateMessageGas ( context . Context , * types . Message , * api . MessageSendSpec , types . TipSetKey ) ( * types . Message , error )
2022-06-16 09:12:33 +00:00
StateMinerInfo ( context . Context , address . Address , types . TipSetKey ) ( api . MinerInfo , error )
2022-06-16 11:15:49 +00:00
ChainHead ( ctx context . Context ) ( * types . TipSet , error )
2022-06-16 09:12:33 +00:00
2022-06-17 09:20:33 +00:00
StateSectorPreCommitInfo ( ctx context . Context , maddr address . Address , sectorNumber abi . SectorNumber , tsk types . TipSetKey ) ( * miner . SectorPreCommitOnChainInfo , error )
2022-06-16 09:12:33 +00:00
StateMinerInitialPledgeCollateral ( context . Context , address . Address , miner . SectorPreCommitInfo , types . TipSetKey ) ( big . Int , error )
2022-06-17 09:20:33 +00:00
StateNetworkVersion ( ctx context . Context , tsk types . TipSetKey ) ( network . Version , error )
2022-06-16 09:12:33 +00:00
StateMinerAvailableBalance ( context . Context , address . Address , types . TipSetKey ) ( big . Int , error )
2022-08-09 10:57:20 +00:00
// Address selector
WalletBalance ( context . Context , address . Address ) ( types . BigInt , error )
WalletHas ( context . Context , address . Address ) ( bool , error )
StateAccountKey ( context . Context , address . Address , types . TipSetKey ) ( address . Address , error )
StateLookupID ( context . Context , address . Address , types . TipSetKey ) ( address . Address , error )
2021-03-10 15:16:44 +00:00
}
type AggregateInput struct {
2021-06-09 15:18:09 +00:00
Spt abi . RegisteredSealProof
2022-04-20 21:34:28 +00:00
Info proof . AggregateSealVerifyInfo
2021-06-09 15:18:09 +00:00
Proof [ ] byte
2024-01-25 14:15:55 +00:00
ActivationManifest miner . SectorActivationManifest
DealIDPrecommit bool
2021-03-10 15:16:44 +00:00
}
type CommitBatcher struct {
api CommitBatcherApi
maddr address . Address
mctx context . Context
2022-08-09 10:57:20 +00:00
addrSel AddressSelector
2021-06-08 13:43:43 +00:00
feeCfg config . MinerFeeConfig
2022-08-09 11:30:34 +00:00
getConfig dtypes . GetSealingConfigFunc
2022-06-17 11:52:19 +00:00
prover storiface . Prover
2021-03-10 15:16:44 +00:00
2021-06-07 23:42:14 +00:00
cutoffs map [ abi . SectorNumber ] time . Time
todo map [ abi . SectorNumber ] AggregateInput
2022-06-14 18:31:17 +00:00
waiting map [ abi . SectorNumber ] [ ] chan sealiface . CommitBatchRes
2021-03-10 15:16:44 +00:00
notify , stop , stopped chan struct { }
2022-06-14 18:31:17 +00:00
force chan chan [ ] sealiface . CommitBatchRes
2021-03-10 15:16:44 +00:00
lk sync . Mutex
}
2022-08-09 11:30:34 +00:00
func NewCommitBatcher ( mctx context . Context , maddr address . Address , api CommitBatcherApi , addrSel AddressSelector , feeCfg config . MinerFeeConfig , getConfig dtypes . GetSealingConfigFunc , prov storiface . Prover ) * CommitBatcher {
2021-03-10 15:16:44 +00:00
b := & CommitBatcher {
api : api ,
maddr : maddr ,
mctx : mctx ,
addrSel : addrSel ,
feeCfg : feeCfg ,
getConfig : getConfig ,
2021-05-19 13:20:23 +00:00
prover : prov ,
2021-03-10 15:16:44 +00:00
2021-06-07 23:42:14 +00:00
cutoffs : map [ abi . SectorNumber ] time . Time { } ,
todo : map [ abi . SectorNumber ] AggregateInput { } ,
2022-06-14 18:31:17 +00:00
waiting : map [ abi . SectorNumber ] [ ] chan sealiface . CommitBatchRes { } ,
2021-03-10 15:16:44 +00:00
notify : make ( chan struct { } , 1 ) ,
2022-06-14 18:31:17 +00:00
force : make ( chan chan [ ] sealiface . CommitBatchRes ) ,
2021-03-10 15:16:44 +00:00
stop : make ( chan struct { } ) ,
stopped : make ( chan struct { } ) ,
}
go b . run ( )
return b
}
func ( b * CommitBatcher ) run ( ) {
2022-06-14 18:31:17 +00:00
var forceRes chan [ ] sealiface . CommitBatchRes
var lastMsg [ ] sealiface . CommitBatchRes
2021-03-10 15:16:44 +00:00
2021-05-18 09:20:19 +00:00
cfg , err := b . getConfig ( )
if err != nil {
panic ( err )
}
2021-06-30 08:56:40 +00:00
timer := time . NewTimer ( b . batchWait ( cfg . CommitBatchWait , cfg . CommitBatchSlack ) )
2021-03-10 15:16:44 +00:00
for {
if forceRes != nil {
forceRes <- lastMsg
forceRes = nil
}
lastMsg = nil
2021-06-22 23:30:33 +00:00
// indicates whether we should only start a batch if we have reached or exceeded cfg.MaxCommitBatch
2021-06-22 23:16:36 +00:00
var sendAboveMax bool
2021-03-10 15:16:44 +00:00
select {
case <- b . stop :
close ( b . stopped )
return
case <- b . notify :
sendAboveMax = true
2021-06-30 08:56:40 +00:00
case <- timer . C :
2021-06-22 23:16:36 +00:00
// do nothing
2021-03-10 15:16:44 +00:00
case fr := <- b . force : // user triggered
forceRes = fr
}
var err error
2021-06-22 23:16:36 +00:00
lastMsg , err = b . maybeStartBatch ( sendAboveMax )
2021-03-10 15:16:44 +00:00
if err != nil {
2021-05-18 15:37:52 +00:00
log . Warnw ( "CommitBatcher processBatch error" , "error" , err )
2021-03-10 15:16:44 +00:00
}
2021-06-30 08:56:40 +00:00
if ! timer . Stop ( ) {
select {
case <- timer . C :
default :
}
}
timer . Reset ( b . batchWait ( cfg . CommitBatchWait , cfg . CommitBatchSlack ) )
2021-03-10 15:16:44 +00:00
}
}
2021-06-30 08:56:40 +00:00
func ( b * CommitBatcher ) batchWait ( maxWait , slack time . Duration ) time . Duration {
2021-05-18 11:30:47 +00:00
now := time . Now ( )
b . lk . Lock ( )
defer b . lk . Unlock ( )
2021-05-25 14:47:42 +00:00
if len ( b . todo ) == 0 {
2021-06-30 08:56:40 +00:00
return maxWait
2021-05-25 14:47:42 +00:00
}
2021-06-07 23:42:14 +00:00
var cutoff time . Time
2021-05-18 11:30:47 +00:00
for sn := range b . todo {
2021-06-07 23:42:14 +00:00
sectorCutoff := b . cutoffs [ sn ]
if cutoff . IsZero ( ) || ( ! sectorCutoff . IsZero ( ) && sectorCutoff . Before ( cutoff ) ) {
cutoff = sectorCutoff
2021-05-18 11:30:47 +00:00
}
}
for sn := range b . waiting {
2021-06-07 23:42:14 +00:00
sectorCutoff := b . cutoffs [ sn ]
if cutoff . IsZero ( ) || ( ! sectorCutoff . IsZero ( ) && sectorCutoff . Before ( cutoff ) ) {
cutoff = sectorCutoff
2021-05-18 11:30:47 +00:00
}
}
2021-06-07 23:42:14 +00:00
if cutoff . IsZero ( ) {
2021-06-30 08:56:40 +00:00
return maxWait
2021-05-18 11:30:47 +00:00
}
2021-06-07 23:42:14 +00:00
cutoff = cutoff . Add ( - slack )
if cutoff . Before ( now ) {
2021-06-30 08:56:40 +00:00
return time . Nanosecond // can't return 0
2021-05-18 11:30:47 +00:00
}
2021-06-07 23:42:14 +00:00
wait := cutoff . Sub ( now )
2021-05-18 11:30:47 +00:00
if wait > maxWait {
wait = maxWait
}
2021-06-30 08:56:40 +00:00
return wait
2021-05-18 11:30:47 +00:00
}
2022-06-14 18:31:17 +00:00
func ( b * CommitBatcher ) maybeStartBatch ( notif bool ) ( [ ] sealiface . CommitBatchRes , error ) {
2021-03-10 15:16:44 +00:00
b . lk . Lock ( )
defer b . lk . Unlock ( )
total := len ( b . todo )
if total == 0 {
return nil , nil // nothing to do
}
cfg , err := b . getConfig ( )
if err != nil {
return nil , xerrors . Errorf ( "getting config: %w" , err )
}
if notif && total < cfg . MaxCommitBatch {
return nil , nil
}
2024-01-25 14:15:55 +00:00
var res , resV1 [ ] sealiface . CommitBatchRes
2021-06-01 09:56:19 +00:00
2022-06-16 11:15:49 +00:00
ts , err := b . api . ChainHead ( b . mctx )
2022-06-09 21:09:17 +00:00
if err != nil {
return nil , err
}
2021-07-01 11:33:54 +00:00
2024-01-25 14:15:55 +00:00
nv , err := b . api . StateNetworkVersion ( b . mctx , ts . Key ( ) )
if err != nil {
return nil , xerrors . Errorf ( "getting network version: %s" , err )
}
2022-06-09 21:09:17 +00:00
blackedOut := func ( ) bool {
const nv16BlackoutWindow = abi . ChainEpoch ( 20 ) // a magik number
2022-06-16 11:15:49 +00:00
if ts . Height ( ) <= build . UpgradeSkyrHeight && build . UpgradeSkyrHeight - ts . Height ( ) < nv16BlackoutWindow {
2022-06-09 21:09:17 +00:00
return true
2021-07-01 11:33:54 +00:00
}
2022-06-09 21:09:17 +00:00
return false
}
individual := ( total < cfg . MinCommitBatch ) || ( total < miner . MinAggregatedSectors ) || blackedOut ( )
if ! individual && ! cfg . AggregateAboveBaseFee . Equals ( big . Zero ( ) ) {
2022-06-16 12:00:37 +00:00
if ts . MinTicketBlock ( ) . ParentBaseFee . LessThan ( cfg . AggregateAboveBaseFee ) {
2021-07-01 11:33:54 +00:00
individual = true
}
}
2024-01-25 14:15:55 +00:00
if nv >= MinDDONetworkVersion {
// After nv21, we have a new ProveCommitSectors2 method, which supports
// batching without aggregation, but it doesn't support onboarding
// sectors which were precommitted with DealIDs in the precommit message.
// We prefer it for all other sectors, so first we use the new processBatchV2
var sectors [ ] abi . SectorNumber
for sn := range b . todo {
sectors = append ( sectors , sn )
}
res , err = b . processBatchV2 ( cfg , sectors , nv , ! individual )
if err != nil {
err = xerrors . Errorf ( "processBatchV2: %w" , err )
}
// Mark sectors as done
for _ , r := range res {
if err != nil {
r . Error = err . Error ( )
}
for _ , sn := range r . Sectors {
for _ , ch := range b . waiting [ sn ] {
ch <- r // buffered
}
delete ( b . waiting , sn )
delete ( b . todo , sn )
delete ( b . cutoffs , sn )
}
}
}
if err != nil {
log . Warnf ( "CommitBatcher maybeStartBatch processBatch-ddo %v" , err )
}
if err != nil && len ( res ) == 0 {
return nil , err
}
2021-07-01 11:33:54 +00:00
if individual {
2024-01-25 14:15:55 +00:00
resV1 , err = b . processIndividually ( cfg )
2021-06-01 09:56:19 +00:00
} else {
2023-04-12 04:30:19 +00:00
var sectors [ ] abi . SectorNumber
for sn := range b . todo {
sectors = append ( sectors , sn )
}
2024-01-25 14:15:55 +00:00
resV1 , err = b . processBatchV1 ( cfg , sectors , nv )
2021-06-01 09:56:19 +00:00
}
2021-08-06 08:50:37 +00:00
if err != nil {
log . Warnf ( "CommitBatcher maybeStartBatch individual:%v processBatch %v" , individual , err )
}
2024-01-25 14:15:55 +00:00
if err != nil && len ( resV1 ) == 0 {
2021-06-01 09:56:19 +00:00
return nil , err
}
2024-01-25 14:15:55 +00:00
// Mark the rest as processed
for _ , r := range resV1 {
2021-06-01 09:56:19 +00:00
if err != nil {
r . Error = err . Error ( )
}
for _ , sn := range r . Sectors {
for _ , ch := range b . waiting [ sn ] {
ch <- r // buffered
}
delete ( b . waiting , sn )
delete ( b . todo , sn )
2021-06-07 23:42:14 +00:00
delete ( b . cutoffs , sn )
2021-06-01 09:56:19 +00:00
}
}
2024-01-25 14:15:55 +00:00
res = append ( res , resV1 ... )
2021-06-01 09:56:19 +00:00
return res , nil
}
2024-01-25 14:15:55 +00:00
// processBatchV2 processes a batch of sectors after nv22. It will always send
// ProveCommitSectors3Params which may contain either individual proofs or an
// aggregate proof depending on SP condition and network conditions.
func ( b * CommitBatcher ) processBatchV2 ( cfg sealiface . Config , sectors [ ] abi . SectorNumber , nv network . Version , aggregate bool ) ( [ ] sealiface . CommitBatchRes , error ) {
ts , err := b . api . ChainHead ( b . mctx )
if err != nil {
return nil , err
}
total := len ( sectors )
res := sealiface . CommitBatchRes {
FailedSectors : map [ abi . SectorNumber ] string { } ,
}
params := miner . ProveCommitSectors3Params {
RequireActivationSuccess : cfg . RequireActivationSuccess ,
RequireNotificationSuccess : cfg . RequireNotificationSuccess ,
}
infos := make ( [ ] proof . AggregateSealVerifyInfo , 0 , total )
collateral := big . Zero ( )
for _ , sector := range sectors {
if b . todo [ sector ] . DealIDPrecommit {
// can't process sectors precommitted with deal IDs with ProveCommitSectors2
continue
}
res . Sectors = append ( res . Sectors , sector )
sc , err := b . getSectorCollateral ( sector , ts . Key ( ) )
if err != nil {
res . FailedSectors [ sector ] = err . Error ( )
continue
}
collateral = big . Add ( collateral , sc )
params . SectorActivations = append ( params . SectorActivations , b . todo [ sector ] . ActivationManifest )
params . SectorProofs = append ( params . SectorProofs , b . todo [ sector ] . Proof )
infos = append ( infos , b . todo [ sector ] . Info )
}
if len ( infos ) == 0 {
return nil , nil
}
sort . Slice ( infos , func ( i , j int ) bool {
return infos [ i ] . Number < infos [ j ] . Number
} )
proofs := make ( [ ] [ ] byte , 0 , total )
for _ , info := range infos {
proofs = append ( proofs , b . todo [ info . Number ] . Proof )
}
needFunds := collateral
arp , err := b . aggregateProofType ( nv )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting aggregate proof type: %w" , err )
}
params . AggregateProofType = arp
if aggregate {
params . SectorProofs = nil // can't be set when aggregating
mid , err := address . IDFromAddress ( b . maddr )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting miner id: %w" , err )
}
params . AggregateProof , err = b . prover . AggregateSealProofs ( proof . AggregateSealVerifyProofAndInfos {
Miner : abi . ActorID ( mid ) ,
SealProof : b . todo [ infos [ 0 ] . Number ] . Spt ,
AggregateProof : arp ,
Infos : infos ,
} , proofs )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "aggregating proofs: %w" , err )
}
aggFeeRaw , err := policy . AggregateProveCommitNetworkFee ( nv , len ( infos ) , ts . MinTicketBlock ( ) . ParentBaseFee )
if err != nil {
res . Error = err . Error ( )
log . Errorf ( "getting aggregate commit network fee: %s" , err )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting aggregate commit network fee: %s" , err )
}
aggFee := big . Div ( big . Mul ( aggFeeRaw , aggFeeNum ) , aggFeeDen )
needFunds = big . Add ( collateral , aggFee )
}
needFunds , err = collateralSendAmount ( b . mctx , b . api , b . maddr , cfg , needFunds )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , err
}
maxFee := b . feeCfg . MaxCommitBatchGasFee . FeeForSectors ( len ( infos ) )
goodFunds := big . Add ( maxFee , needFunds )
mi , err := b . api . StateMinerInfo ( b . mctx , b . maddr , types . EmptyTSK )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "couldn't get miner info: %w" , err )
}
from , _ , err := b . addrSel . AddressFor ( b . mctx , b . api , mi , api . CommitAddr , goodFunds , needFunds )
if err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "no good address found: %w" , err )
}
enc := new ( bytes . Buffer )
if err := params . MarshalCBOR ( enc ) ; err != nil {
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "couldn't serialize ProveCommitSectors2Params: %w" , err )
}
_ , err = simulateMsgGas ( b . mctx , b . api , from , b . maddr , builtin . MethodsMiner . ProveCommitSectors3 , needFunds , maxFee , enc . Bytes ( ) )
if err != nil && ( ! api . ErrorIsIn ( err , [ ] error { & api . ErrOutOfGas { } } ) || len ( sectors ) < miner . MinAggregatedSectors * 2 ) {
log . Errorf ( "simulating CommitBatch message failed: %s" , err )
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "simulating CommitBatch message failed: %w" , err )
}
msgTooLarge := len ( enc . Bytes ( ) ) > ( messagepool . MaxMessageSize - 128 )
// If we're out of gas, split the batch in half and evaluate again
if api . ErrorIsIn ( err , [ ] error { & api . ErrOutOfGas { } } ) || msgTooLarge {
log . Warnf ( "CommitAggregate message ran out of gas or is too large, splitting batch in half and trying again (sectors: %d, params: %d)" , len ( sectors ) , len ( enc . Bytes ( ) ) )
mid := len ( sectors ) / 2
ret0 , _ := b . processBatchV2 ( cfg , sectors [ : mid ] , nv , aggregate )
ret1 , _ := b . processBatchV2 ( cfg , sectors [ mid : ] , nv , aggregate )
return append ( ret0 , ret1 ... ) , nil
}
mcid , err := sendMsg ( b . mctx , b . api , from , b . maddr , builtin . MethodsMiner . ProveCommitSectors3 , needFunds , maxFee , enc . Bytes ( ) )
if err != nil {
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "sending message failed (params size: %d, sectors: %d, agg: %t): %w" , len ( enc . Bytes ( ) ) , len ( sectors ) , aggregate , err )
}
res . Msg = & mcid
log . Infow ( "Sent ProveCommitSectors2 message" , "cid" , mcid , "from" , from , "todo" , total , "sectors" , len ( infos ) )
return [ ] sealiface . CommitBatchRes { res } , nil
}
// processBatchV1 processes a batch of sectors before nv22. It always sends out an aggregate message.
func ( b * CommitBatcher ) processBatchV1 ( cfg sealiface . Config , sectors [ ] abi . SectorNumber , nv network . Version ) ( [ ] sealiface . CommitBatchRes , error ) {
2022-06-16 11:15:49 +00:00
ts , err := b . api . ChainHead ( b . mctx )
2021-06-01 10:02:34 +00:00
if err != nil {
return nil , err
}
2023-04-12 04:30:19 +00:00
total := len ( sectors )
2021-06-01 09:56:19 +00:00
2022-06-14 18:31:17 +00:00
res := sealiface . CommitBatchRes {
2021-07-01 10:15:58 +00:00
FailedSectors : map [ abi . SectorNumber ] string { } ,
}
2021-06-01 09:56:19 +00:00
2022-04-20 21:34:28 +00:00
params := miner . ProveCommitAggregateParams {
2021-06-01 09:56:19 +00:00
SectorNumbers : bitfield . New ( ) ,
}
2021-05-17 20:51:29 +00:00
proofs := make ( [ ] [ ] byte , 0 , total )
2022-04-20 21:34:28 +00:00
infos := make ( [ ] proof . AggregateSealVerifyInfo , 0 , total )
2021-06-01 10:02:34 +00:00
collateral := big . Zero ( )
2021-03-10 15:16:44 +00:00
2023-04-12 04:30:19 +00:00
for _ , sector := range sectors {
res . Sectors = append ( res . Sectors , sector )
2021-06-11 02:33:57 +00:00
2023-04-12 04:30:19 +00:00
sc , err := b . getSectorCollateral ( sector , ts . Key ( ) )
2021-06-01 10:02:34 +00:00
if err != nil {
2023-04-12 04:30:19 +00:00
res . FailedSectors [ sector ] = err . Error ( )
2021-06-01 10:02:34 +00:00
continue
}
collateral = big . Add ( collateral , sc )
2023-04-12 04:30:19 +00:00
params . SectorNumbers . Set ( uint64 ( sector ) )
infos = append ( infos , b . todo [ sector ] . Info )
2021-03-10 15:16:44 +00:00
}
2021-09-06 08:20:23 +00:00
if len ( infos ) == 0 {
return nil , nil
}
2021-05-18 16:58:41 +00:00
sort . Slice ( infos , func ( i , j int ) bool {
return infos [ i ] . Number < infos [ j ] . Number
} )
for _ , info := range infos {
2021-06-09 15:18:09 +00:00
proofs = append ( proofs , b . todo [ info . Number ] . Proof )
2021-05-18 16:58:41 +00:00
}
2021-05-18 18:34:23 +00:00
mid , err := address . IDFromAddress ( b . maddr )
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting miner id: %w" , err )
2021-05-18 18:34:23 +00:00
}
2022-06-09 21:09:17 +00:00
arp , err := b . aggregateProofType ( nv )
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting aggregate proof type: %w" , err )
2022-06-09 21:09:17 +00:00
}
2022-04-20 21:34:28 +00:00
params . AggregateProof , err = b . prover . AggregateSealProofs ( proof . AggregateSealVerifyProofAndInfos {
2021-05-18 18:34:23 +00:00
Miner : abi . ActorID ( mid ) ,
2021-06-09 15:18:09 +00:00
SealProof : b . todo [ infos [ 0 ] . Number ] . Spt ,
2021-05-18 14:51:06 +00:00
AggregateProof : arp ,
Infos : infos ,
} , proofs )
2021-03-10 15:16:44 +00:00
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "aggregating proofs: %w" , err )
2021-03-10 15:16:44 +00:00
}
enc := new ( bytes . Buffer )
if err := params . MarshalCBOR ( enc ) ; err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "couldn't serialize ProveCommitAggregateParams: %w" , err )
2021-03-10 15:16:44 +00:00
}
2022-06-16 09:12:33 +00:00
mi , err := b . api . StateMinerInfo ( b . mctx , b . maddr , types . EmptyTSK )
2021-03-10 15:16:44 +00:00
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "couldn't get miner info: %w" , err )
2021-03-10 15:16:44 +00:00
}
2021-06-08 13:43:43 +00:00
maxFee := b . feeCfg . MaxCommitBatchGasFee . FeeForSectors ( len ( infos ) )
2022-06-16 12:00:37 +00:00
aggFeeRaw , err := policy . AggregateProveCommitNetworkFee ( nv , len ( infos ) , ts . MinTicketBlock ( ) . ParentBaseFee )
2021-08-10 17:07:30 +00:00
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2021-10-01 14:23:21 +00:00
log . Errorf ( "getting aggregate commit network fee: %s" , err )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "getting aggregate commit network fee: %s" , err )
2021-08-10 17:07:30 +00:00
}
2021-10-01 14:23:21 +00:00
2021-08-10 17:07:30 +00:00
aggFee := big . Div ( big . Mul ( aggFeeRaw , aggFeeNum ) , aggFeeDen )
2021-06-09 01:40:52 +00:00
2021-06-29 06:06:41 +00:00
needFunds := big . Add ( collateral , aggFee )
2021-07-12 16:46:05 +00:00
needFunds , err = collateralSendAmount ( b . mctx , b . api , b . maddr , cfg , needFunds )
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , err
2021-06-29 16:17:08 +00:00
}
2021-06-29 06:06:41 +00:00
goodFunds := big . Add ( maxFee , needFunds )
2022-08-09 10:57:20 +00:00
from , _ , err := b . addrSel . AddressFor ( b . mctx , b . api , mi , api . CommitAddr , goodFunds , needFunds )
2021-03-10 15:16:44 +00:00
if err != nil {
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "no good address found: %w" , err )
2021-03-10 15:16:44 +00:00
}
2023-04-12 04:30:19 +00:00
_ , err = simulateMsgGas ( b . mctx , b . api , from , b . maddr , builtin . MethodsMiner . ProveCommitAggregate , needFunds , maxFee , enc . Bytes ( ) )
if err != nil && ( ! api . ErrorIsIn ( err , [ ] error { & api . ErrOutOfGas { } } ) || len ( sectors ) < miner . MinAggregatedSectors * 2 ) {
2023-04-13 01:45:43 +00:00
log . Errorf ( "simulating CommitBatch message failed: %s" , err )
2023-04-12 04:30:19 +00:00
res . Error = err . Error ( )
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "simulating CommitBatch message failed: %w" , err )
}
2023-04-20 16:15:51 +00:00
// If we're out of gas, split the batch in half and evaluate again
2023-04-12 04:30:19 +00:00
if api . ErrorIsIn ( err , [ ] error { & api . ErrOutOfGas { } } ) {
2023-04-13 01:45:43 +00:00
log . Warnf ( "CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)" , len ( sectors ) )
2023-04-12 04:30:19 +00:00
mid := len ( sectors ) / 2
2024-01-25 14:15:55 +00:00
ret0 , _ := b . processBatchV1 ( cfg , sectors [ : mid ] , nv )
ret1 , _ := b . processBatchV1 ( cfg , sectors [ mid : ] , nv )
2023-04-12 04:30:19 +00:00
2023-04-13 01:45:43 +00:00
return append ( ret0 , ret1 ... ) , nil
2023-04-12 04:30:19 +00:00
}
2022-06-16 13:50:41 +00:00
mcid , err := sendMsg ( b . mctx , b . api , from , b . maddr , builtin . MethodsMiner . ProveCommitAggregate , needFunds , maxFee , enc . Bytes ( ) )
2021-03-10 15:16:44 +00:00
if err != nil {
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , xerrors . Errorf ( "sending message failed: %w" , err )
2021-03-10 15:16:44 +00:00
}
2021-06-01 09:56:19 +00:00
res . Msg = & mcid
2021-05-19 18:34:50 +00:00
log . Infow ( "Sent ProveCommitAggregate message" , "cid" , mcid , "from" , from , "todo" , total , "sectors" , len ( infos ) )
2021-03-10 15:16:44 +00:00
2022-06-14 18:31:17 +00:00
return [ ] sealiface . CommitBatchRes { res } , nil
2021-06-01 09:56:19 +00:00
}
2021-03-10 15:16:44 +00:00
2022-06-14 18:31:17 +00:00
func ( b * CommitBatcher ) processIndividually ( cfg sealiface . Config ) ( [ ] sealiface . CommitBatchRes , error ) {
2023-04-01 23:30:32 +00:00
2022-06-16 09:12:33 +00:00
mi , err := b . api . StateMinerInfo ( b . mctx , b . maddr , types . EmptyTSK )
2021-06-01 09:56:19 +00:00
if err != nil {
return nil , xerrors . Errorf ( "couldn't get miner info: %w" , err )
}
2021-07-12 16:46:05 +00:00
avail := types . TotalFilecoinInt
if cfg . CollateralFromMinerBalance && ! cfg . DisableCollateralFallback {
2022-06-16 09:12:33 +00:00
avail , err = b . api . StateMinerAvailableBalance ( b . mctx , b . maddr , types . EmptyTSK )
2021-07-12 16:46:05 +00:00
if err != nil {
return nil , xerrors . Errorf ( "getting available miner balance: %w" , err )
}
avail = big . Sub ( avail , cfg . AvailableBalanceBuffer )
if avail . LessThan ( big . Zero ( ) ) {
avail = big . Zero ( )
}
}
2022-06-16 11:15:49 +00:00
ts , err := b . api . ChainHead ( b . mctx )
2021-06-01 09:56:19 +00:00
if err != nil {
return nil , err
}
2022-06-14 18:31:17 +00:00
var res [ ] sealiface . CommitBatchRes
2021-06-01 09:56:19 +00:00
2023-04-01 23:30:32 +00:00
sectorsProcessed := 0
2021-06-01 09:56:19 +00:00
for sn , info := range b . todo {
2022-06-14 18:31:17 +00:00
r := sealiface . CommitBatchRes {
2021-07-01 10:15:58 +00:00
Sectors : [ ] abi . SectorNumber { sn } ,
FailedSectors : map [ abi . SectorNumber ] string { } ,
2021-03-10 15:16:44 +00:00
}
2021-06-01 09:56:19 +00:00
2023-04-01 23:30:32 +00:00
if cfg . MaxSectorProveCommitsSubmittedPerEpoch > 0 &&
uint64 ( sectorsProcessed ) >= cfg . MaxSectorProveCommitsSubmittedPerEpoch {
tmp := ts
for tmp . Height ( ) <= ts . Height ( ) {
tmp , err = b . api . ChainHead ( b . mctx )
if err != nil {
log . Errorf ( "getting chain head: %+v" , err )
return nil , err
}
time . Sleep ( 3 * time . Second )
}
sectorsProcessed = 0
ts = tmp
}
2022-06-16 11:15:49 +00:00
mcid , err := b . processSingle ( cfg , mi , & avail , sn , info , ts . Key ( ) )
2021-06-01 09:56:19 +00:00
if err != nil {
log . Errorf ( "process single error: %+v" , err ) // todo: return to user
r . FailedSectors [ sn ] = err . Error ( )
} else {
r . Msg = & mcid
}
res = append ( res , r )
2023-04-01 23:30:32 +00:00
sectorsProcessed ++
2021-06-01 09:56:19 +00:00
}
return res , nil
}
2022-06-17 09:20:33 +00:00
func ( b * CommitBatcher ) processSingle ( cfg sealiface . Config , mi api . MinerInfo , avail * abi . TokenAmount , sn abi . SectorNumber , info AggregateInput , tsk types . TipSetKey ) ( cid . Cid , error ) {
2024-01-25 14:15:55 +00:00
return b . processSingleV1 ( cfg , mi , avail , sn , info , tsk )
}
func ( b * CommitBatcher ) processSingleV1 ( cfg sealiface . Config , mi api . MinerInfo , avail * abi . TokenAmount , sn abi . SectorNumber , info AggregateInput , tsk types . TipSetKey ) ( cid . Cid , error ) {
2021-06-01 09:56:19 +00:00
enc := new ( bytes . Buffer )
params := & miner . ProveCommitSectorParams {
SectorNumber : sn ,
2021-06-09 15:18:09 +00:00
Proof : info . Proof ,
2021-06-01 09:56:19 +00:00
}
if err := params . MarshalCBOR ( enc ) ; err != nil {
return cid . Undef , xerrors . Errorf ( "marshaling commit params: %w" , err )
}
2022-06-17 09:20:33 +00:00
collateral , err := b . getSectorCollateral ( sn , tsk )
2021-03-10 15:16:44 +00:00
if err != nil {
2021-06-01 09:56:19 +00:00
return cid . Undef , err
2021-03-10 15:16:44 +00:00
}
2021-06-29 16:17:08 +00:00
if cfg . CollateralFromMinerBalance {
2021-07-12 16:46:05 +00:00
c := big . Sub ( collateral , * avail )
* avail = big . Sub ( * avail , collateral )
collateral = c
if collateral . LessThan ( big . Zero ( ) ) {
collateral = big . Zero ( )
}
if ( * avail ) . LessThan ( big . Zero ( ) ) {
* avail = big . Zero ( )
}
2021-06-29 16:17:08 +00:00
}
2021-06-08 13:43:43 +00:00
goodFunds := big . Add ( collateral , big . Int ( b . feeCfg . MaxCommitGasFee ) )
2021-06-01 09:56:19 +00:00
2022-08-09 10:57:20 +00:00
from , _ , err := b . addrSel . AddressFor ( b . mctx , b . api , mi , api . CommitAddr , goodFunds , collateral )
2021-06-01 09:56:19 +00:00
if err != nil {
return cid . Undef , xerrors . Errorf ( "no good address to send commit message from: %w" , err )
}
2022-06-16 13:50:41 +00:00
mcid , err := sendMsg ( b . mctx , b . api , from , b . maddr , builtin . MethodsMiner . ProveCommitSector , collateral , big . Int ( b . feeCfg . MaxCommitGasFee ) , enc . Bytes ( ) )
2021-06-01 09:56:19 +00:00
if err != nil {
return cid . Undef , xerrors . Errorf ( "pushing message to mpool: %w" , err )
}
return mcid , nil
2021-03-10 15:16:44 +00:00
}
// register commit, wait for batch message, return message CID
2022-06-14 18:31:17 +00:00
func ( b * CommitBatcher ) AddCommit ( ctx context . Context , s SectorInfo , in AggregateInput ) ( res sealiface . CommitBatchRes , err error ) {
2021-06-08 20:46:35 +00:00
sn := s . SectorNumber
cu , err := b . getCommitCutoff ( s )
2021-05-18 14:51:06 +00:00
if err != nil {
2022-06-14 18:31:17 +00:00
return sealiface . CommitBatchRes { } , err
2021-05-18 14:51:06 +00:00
}
2021-03-10 15:16:44 +00:00
b . lk . Lock ( )
2021-06-08 20:46:35 +00:00
b . cutoffs [ sn ] = cu
2021-05-18 11:30:47 +00:00
b . todo [ sn ] = in
2021-03-10 15:16:44 +00:00
2022-06-14 18:31:17 +00:00
sent := make ( chan sealiface . CommitBatchRes , 1 )
2021-05-18 11:30:47 +00:00
b . waiting [ sn ] = append ( b . waiting [ sn ] , sent )
2021-03-10 15:16:44 +00:00
select {
case b . notify <- struct { } { } :
default : // already have a pending notification, don't need more
}
b . lk . Unlock ( )
select {
2021-06-01 09:56:19 +00:00
case r := <- sent :
return r , nil
2021-03-10 15:16:44 +00:00
case <- ctx . Done ( ) :
2022-06-14 18:31:17 +00:00
return sealiface . CommitBatchRes { } , ctx . Err ( )
2021-03-10 15:16:44 +00:00
}
}
2022-06-14 18:31:17 +00:00
func ( b * CommitBatcher ) Flush ( ctx context . Context ) ( [ ] sealiface . CommitBatchRes , error ) {
resCh := make ( chan [ ] sealiface . CommitBatchRes , 1 )
2021-03-10 15:16:44 +00:00
select {
case b . force <- resCh :
select {
case res := <- resCh :
return res , nil
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
func ( b * CommitBatcher ) Pending ( ctx context . Context ) ( [ ] abi . SectorID , error ) {
b . lk . Lock ( )
defer b . lk . Unlock ( )
mid , err := address . IDFromAddress ( b . maddr )
if err != nil {
return nil , err
}
res := make ( [ ] abi . SectorID , 0 )
for _ , s := range b . todo {
res = append ( res , abi . SectorID {
Miner : abi . ActorID ( mid ) ,
2021-06-09 15:18:09 +00:00
Number : s . Info . Number ,
2021-03-10 15:16:44 +00:00
} )
}
sort . Slice ( res , func ( i , j int ) bool {
if res [ i ] . Miner != res [ j ] . Miner {
return res [ i ] . Miner < res [ j ] . Miner
}
return res [ i ] . Number < res [ j ] . Number
} )
return res , nil
}
func ( b * CommitBatcher ) Stop ( ctx context . Context ) error {
close ( b . stop )
select {
case <- b . stopped :
return nil
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
2021-05-18 14:51:06 +00:00
2021-06-09 16:26:20 +00:00
// TODO: If this returned epochs, it would make testing much easier
2021-06-08 20:46:35 +00:00
func ( b * CommitBatcher ) getCommitCutoff ( si SectorInfo ) ( time . Time , error ) {
2022-06-16 11:15:49 +00:00
ts , err := b . api . ChainHead ( b . mctx )
2021-06-08 20:46:35 +00:00
if err != nil {
2021-06-09 16:26:20 +00:00
return time . Now ( ) , xerrors . Errorf ( "getting chain head: %s" , err )
2021-06-08 20:46:35 +00:00
}
2022-06-16 11:15:49 +00:00
nv , err := b . api . StateNetworkVersion ( b . mctx , ts . Key ( ) )
2021-06-08 20:46:35 +00:00
if err != nil {
log . Errorf ( "getting network version: %s" , err )
2021-06-09 16:26:20 +00:00
return time . Now ( ) , xerrors . Errorf ( "getting network version: %s" , err )
2021-06-08 20:46:35 +00:00
}
2022-06-16 11:15:49 +00:00
pci , err := b . api . StateSectorPreCommitInfo ( b . mctx , b . maddr , si . SectorNumber , ts . Key ( ) )
2021-06-08 20:46:35 +00:00
if err != nil {
log . Errorf ( "getting precommit info: %s" , err )
return time . Now ( ) , err
}
2022-08-09 09:44:40 +00:00
if pci == nil {
return time . Now ( ) , xerrors . Errorf ( "precommit info not found" )
}
2022-09-06 15:49:29 +00:00
av , err := actorstypes . VersionForNetwork ( nv )
2021-08-10 17:07:30 +00:00
if err != nil {
log . Errorf ( "unsupported network vrsion: %s" , err )
return time . Now ( ) , err
}
mpcd , err := policy . GetMaxProveCommitDuration ( av , si . SectorType )
if err != nil {
log . Errorf ( "getting max prove commit duration: %s" , err )
return time . Now ( ) , err
}
2021-06-08 20:46:35 +00:00
2021-08-10 17:07:30 +00:00
cutoffEpoch := pci . PreCommitEpoch + mpcd
2021-06-08 20:46:35 +00:00
2021-05-18 14:51:06 +00:00
for _ , p := range si . Pieces {
2024-01-25 14:15:55 +00:00
if ! p . HasDealInfo ( ) {
2021-05-18 14:51:06 +00:00
continue
}
2024-01-25 14:15:55 +00:00
startEpoch , err := p . StartEpoch ( )
if err != nil {
log . Errorf ( "getting deal start epoch: %s" , err )
return time . Now ( ) , err
}
2021-06-07 23:42:14 +00:00
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
2021-05-18 14:51:06 +00:00
}
}
2022-06-16 11:15:49 +00:00
if cutoffEpoch <= ts . Height ( ) {
2021-06-08 20:46:35 +00:00
return time . Now ( ) , nil
2021-05-18 14:51:06 +00:00
}
2022-06-16 11:15:49 +00:00
return time . Now ( ) . Add ( time . Duration ( cutoffEpoch - ts . Height ( ) ) * time . Duration ( build . BlockDelaySecs ) * time . Second ) , nil
2021-05-18 14:51:06 +00:00
}
2021-06-01 09:56:19 +00:00
2022-06-17 09:20:33 +00:00
func ( b * CommitBatcher ) getSectorCollateral ( sn abi . SectorNumber , tsk types . TipSetKey ) ( abi . TokenAmount , error ) {
pci , err := b . api . StateSectorPreCommitInfo ( b . mctx , b . maddr , sn , tsk )
2021-06-01 09:56:19 +00:00
if err != nil {
return big . Zero ( ) , xerrors . Errorf ( "getting precommit info: %w" , err )
}
if pci == nil {
return big . Zero ( ) , xerrors . Errorf ( "precommit info not found on chain" )
}
2022-06-17 09:20:33 +00:00
collateral , err := b . api . StateMinerInitialPledgeCollateral ( b . mctx , b . maddr , pci . Info , tsk )
2021-06-01 09:56:19 +00:00
if err != nil {
return big . Zero ( ) , xerrors . Errorf ( "getting initial pledge collateral: %w" , err )
}
collateral = big . Sub ( collateral , pci . PreCommitDeposit )
if collateral . LessThan ( big . Zero ( ) ) {
collateral = big . Zero ( )
}
return collateral , nil
}
2022-06-09 21:09:17 +00:00
func ( b * CommitBatcher ) aggregateProofType ( nv network . Version ) ( abi . RegisteredAggregationProof , error ) {
if nv < network . Version16 {
return abi . RegisteredAggregationProof_SnarkPackV1 , nil
}
return abi . RegisteredAggregationProof_SnarkPackV2 , nil
}