Add feature to stagger sector prove commit submission (#10543)
* Add feature to stagger sector prove commit submission * make gen and docsgen as usual * address comments and lint * Update comment Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> * make gen for stupid comment * make docsgen * address comments --------- Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>
This commit is contained in:
parent
3b0d4f2e45
commit
2278a209e2
@ -657,6 +657,16 @@
|
|||||||
# env var: LOTUS_SEALING_AGGREGATEABOVEBASEFEE
|
# env var: LOTUS_SEALING_AGGREGATEABOVEBASEFEE
|
||||||
#AggregateAboveBaseFee = "0.00000000032 FIL"
|
#AggregateAboveBaseFee = "0.00000000032 FIL"
|
||||||
|
|
||||||
|
# When submitting several sector prove commit messages simultaneously, this option allows you to
|
||||||
|
# stagger the number of prove commits submitted per epoch
|
||||||
|
# This is done because gas estimates for ProveCommits are non deterministic and increasing as a large
|
||||||
|
# number of sectors get committed within the same epoch resulting in occasionally failed msgs.
|
||||||
|
# Submitting a smaller number of prove commits per epoch would reduce the possibility of failed msgs
|
||||||
|
#
|
||||||
|
# type: uint64
|
||||||
|
# env var: LOTUS_SEALING_MAXSECTORPROVECOMMITSSUBMITTEDPEREPOCH
|
||||||
|
#MaxSectorProveCommitsSubmittedPerEpoch = 0
|
||||||
|
|
||||||
# type: uint64
|
# type: uint64
|
||||||
# env var: LOTUS_SEALING_TERMINATEBATCHMAX
|
# env var: LOTUS_SEALING_TERMINATEBATCHMAX
|
||||||
#TerminateBatchMax = 100
|
#TerminateBatchMax = 100
|
||||||
|
@ -1243,6 +1243,16 @@ sending precommit messages to the chain individually`,
|
|||||||
|
|
||||||
Comment: `network BaseFee below which to stop doing commit aggregation, instead
|
Comment: `network BaseFee below which to stop doing commit aggregation, instead
|
||||||
submitting proofs to the chain individually`,
|
submitting proofs to the chain individually`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "MaxSectorProveCommitsSubmittedPerEpoch",
|
||||||
|
Type: "uint64",
|
||||||
|
|
||||||
|
Comment: `When submitting several sector prove commit messages simultaneously, this option allows you to
|
||||||
|
stagger the number of prove commits submitted per epoch
|
||||||
|
This is done because gas estimates for ProveCommits are non deterministic and increasing as a large
|
||||||
|
number of sectors get committed within the same epoch resulting in occasionally failed msgs.
|
||||||
|
Submitting a smaller number of prove commits per epoch would reduce the possibility of failed msgs`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "TerminateBatchMax",
|
Name: "TerminateBatchMax",
|
||||||
|
@ -422,6 +422,13 @@ type SealingConfig struct {
|
|||||||
// submitting proofs to the chain individually
|
// submitting proofs to the chain individually
|
||||||
AggregateAboveBaseFee types.FIL
|
AggregateAboveBaseFee types.FIL
|
||||||
|
|
||||||
|
// When submitting several sector prove commit messages simultaneously, this option allows you to
|
||||||
|
// stagger the number of prove commits submitted per epoch
|
||||||
|
// This is done because gas estimates for ProveCommits are non deterministic and increasing as a large
|
||||||
|
// number of sectors get committed within the same epoch resulting in occasionally failed msgs.
|
||||||
|
// Submitting a smaller number of prove commits per epoch would reduce the possibility of failed msgs
|
||||||
|
MaxSectorProveCommitsSubmittedPerEpoch uint64
|
||||||
|
|
||||||
TerminateBatchMax uint64
|
TerminateBatchMax uint64
|
||||||
TerminateBatchMin uint64
|
TerminateBatchMin uint64
|
||||||
TerminateBatchWait Duration
|
TerminateBatchWait Duration
|
||||||
|
@ -1014,9 +1014,10 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
|||||||
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
|
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
|
||||||
BatchPreCommitAboveBaseFee: types.FIL(cfg.BatchPreCommitAboveBaseFee),
|
BatchPreCommitAboveBaseFee: types.FIL(cfg.BatchPreCommitAboveBaseFee),
|
||||||
|
|
||||||
TerminateBatchMax: cfg.TerminateBatchMax,
|
TerminateBatchMax: cfg.TerminateBatchMax,
|
||||||
TerminateBatchMin: cfg.TerminateBatchMin,
|
TerminateBatchMin: cfg.TerminateBatchMin,
|
||||||
TerminateBatchWait: config.Duration(cfg.TerminateBatchWait),
|
TerminateBatchWait: config.Duration(cfg.TerminateBatchWait),
|
||||||
|
MaxSectorProveCommitsSubmittedPerEpoch: cfg.MaxSectorProveCommitsSubmittedPerEpoch,
|
||||||
}
|
}
|
||||||
c.SetSealingConfig(newCfg)
|
c.SetSealingConfig(newCfg)
|
||||||
})
|
})
|
||||||
@ -1051,13 +1052,14 @@ func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.Se
|
|||||||
PreCommitBatchWait: time.Duration(sealingCfg.PreCommitBatchWait),
|
PreCommitBatchWait: time.Duration(sealingCfg.PreCommitBatchWait),
|
||||||
PreCommitBatchSlack: time.Duration(sealingCfg.PreCommitBatchSlack),
|
PreCommitBatchSlack: time.Duration(sealingCfg.PreCommitBatchSlack),
|
||||||
|
|
||||||
AggregateCommits: sealingCfg.AggregateCommits,
|
AggregateCommits: sealingCfg.AggregateCommits,
|
||||||
MinCommitBatch: sealingCfg.MinCommitBatch,
|
MinCommitBatch: sealingCfg.MinCommitBatch,
|
||||||
MaxCommitBatch: sealingCfg.MaxCommitBatch,
|
MaxCommitBatch: sealingCfg.MaxCommitBatch,
|
||||||
CommitBatchWait: time.Duration(sealingCfg.CommitBatchWait),
|
CommitBatchWait: time.Duration(sealingCfg.CommitBatchWait),
|
||||||
CommitBatchSlack: time.Duration(sealingCfg.CommitBatchSlack),
|
CommitBatchSlack: time.Duration(sealingCfg.CommitBatchSlack),
|
||||||
AggregateAboveBaseFee: types.BigInt(sealingCfg.AggregateAboveBaseFee),
|
AggregateAboveBaseFee: types.BigInt(sealingCfg.AggregateAboveBaseFee),
|
||||||
BatchPreCommitAboveBaseFee: types.BigInt(sealingCfg.BatchPreCommitAboveBaseFee),
|
BatchPreCommitAboveBaseFee: types.BigInt(sealingCfg.BatchPreCommitAboveBaseFee),
|
||||||
|
MaxSectorProveCommitsSubmittedPerEpoch: sealingCfg.MaxSectorProveCommitsSubmittedPerEpoch,
|
||||||
|
|
||||||
TerminateBatchMax: sealingCfg.TerminateBatchMax,
|
TerminateBatchMax: sealingCfg.TerminateBatchMax,
|
||||||
TerminateBatchMin: sealingCfg.TerminateBatchMin,
|
TerminateBatchMin: sealingCfg.TerminateBatchMin,
|
||||||
|
@ -388,6 +388,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
|
func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
|
||||||
|
|
||||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
|
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||||
@ -414,12 +415,31 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C
|
|||||||
|
|
||||||
var res []sealiface.CommitBatchRes
|
var res []sealiface.CommitBatchRes
|
||||||
|
|
||||||
|
sectorsProcessed := 0
|
||||||
|
|
||||||
for sn, info := range b.todo {
|
for sn, info := range b.todo {
|
||||||
r := sealiface.CommitBatchRes{
|
r := sealiface.CommitBatchRes{
|
||||||
Sectors: []abi.SectorNumber{sn},
|
Sectors: []abi.SectorNumber{sn},
|
||||||
FailedSectors: map[abi.SectorNumber]string{},
|
FailedSectors: map[abi.SectorNumber]string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSectorProveCommitsSubmittedPerEpoch > 0 &&
|
||||||
|
uint64(sectorsProcessed) >= cfg.MaxSectorProveCommitsSubmittedPerEpoch {
|
||||||
|
|
||||||
|
tmp := ts
|
||||||
|
for tmp.Height() <= ts.Height() {
|
||||||
|
tmp, err = b.api.ChainHead(b.mctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("getting chain head: %+v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
sectorsProcessed = 0
|
||||||
|
ts = tmp
|
||||||
|
}
|
||||||
|
|
||||||
mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key())
|
mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("process single error: %+v", err) // todo: return to user
|
log.Errorf("process single error: %+v", err) // todo: return to user
|
||||||
@ -429,6 +449,8 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, r)
|
res = append(res, r)
|
||||||
|
|
||||||
|
sectorsProcessed++
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
@ -58,6 +58,8 @@ type Config struct {
|
|||||||
AggregateAboveBaseFee abi.TokenAmount
|
AggregateAboveBaseFee abi.TokenAmount
|
||||||
BatchPreCommitAboveBaseFee abi.TokenAmount
|
BatchPreCommitAboveBaseFee abi.TokenAmount
|
||||||
|
|
||||||
|
MaxSectorProveCommitsSubmittedPerEpoch uint64
|
||||||
|
|
||||||
TerminateBatchMax uint64
|
TerminateBatchMax uint64
|
||||||
TerminateBatchMin uint64
|
TerminateBatchMin uint64
|
||||||
TerminateBatchWait time.Duration
|
TerminateBatchWait time.Duration
|
||||||
|
Loading…
Reference in New Issue
Block a user