feat: wdpost: Config for maximum partition count per message
This commit is contained in:
parent
01254ab320
commit
673f9238be
@ -315,6 +315,12 @@
|
|||||||
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
|
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
|
||||||
#ParallelCheckLimit = 128
|
#ParallelCheckLimit = 128
|
||||||
|
|
||||||
|
# Setting this value above the network limit has no effect
|
||||||
|
#
|
||||||
|
# type: int
|
||||||
|
# env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE
|
||||||
|
#MaxPartitionsPerMessage = 0
|
||||||
|
|
||||||
|
|
||||||
[Sealing]
|
[Sealing]
|
||||||
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
|
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
|
||||||
|
@ -116,7 +116,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
Override(new(*miner.Miner), modules.SetupBlockProducer),
|
Override(new(*miner.Miner), modules.SetupBlockProducer),
|
||||||
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
|
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
|
||||||
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
|
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
|
||||||
Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees)),
|
Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)),
|
||||||
Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))),
|
Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -629,6 +629,12 @@ over the worker address if this flag is set.`,
|
|||||||
|
|
||||||
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
|
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "MaxPartitionsPerMessage",
|
||||||
|
Type: "int",
|
||||||
|
|
||||||
|
Comment: `Setting this value above the network limit has no effect`,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"Pubsub": []DocField{
|
"Pubsub": []DocField{
|
||||||
{
|
{
|
||||||
|
@ -223,6 +223,20 @@ type ProvingConfig struct {
|
|||||||
ParallelCheckLimit int
|
ParallelCheckLimit int
|
||||||
|
|
||||||
// todo disable builtin post
|
// todo disable builtin post
|
||||||
|
// Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
|
||||||
|
//
|
||||||
|
// A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors.
|
||||||
|
//
|
||||||
|
// The maximum number of sectors which can be proven in a single PoSt message is 25000 in network version 16, which
|
||||||
|
// means that a single message can prove at most 10 partinions
|
||||||
|
//
|
||||||
|
// In some cases when submitting PoSt messages which are recovering sectors, the default network limit may still be
|
||||||
|
// too high to fit in the block gas limit; In those cases it may be necessary to set this value to something lower
|
||||||
|
// than 10; Note that setting this value lower may result in less efficient gas use - more messages will be sent,
|
||||||
|
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
|
||||||
|
//
|
||||||
|
// Setting this value above the network limit has no effect
|
||||||
|
MaxPartitionsPerMessage int
|
||||||
}
|
}
|
||||||
|
|
||||||
type SealingConfig struct {
|
type SealingConfig struct {
|
||||||
|
@ -256,7 +256,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
|
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
|
||||||
return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
|
return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
|
||||||
var (
|
var (
|
||||||
mctx = params.MetricsCtx
|
mctx = params.MetricsCtx
|
||||||
@ -271,7 +271,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParam
|
|||||||
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, verif, sealer, j, maddr)
|
fps, err := storage.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -778,6 +778,13 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
|
|||||||
partitionsPerMsg = declMax
|
partitionsPerMsg = declMax
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// respect user config if set
|
||||||
|
if s.maxPartitionsPerMessage > 0 {
|
||||||
|
if partitionsPerMsg > s.maxPartitionsPerMessage {
|
||||||
|
partitionsPerMsg = s.maxPartitionsPerMessage
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// The number of messages will be:
|
// The number of messages will be:
|
||||||
// ceiling(number of partitions / partitions per message)
|
// ceiling(number of partitions / partitions per message)
|
||||||
batchCount := len(partitions) / partitionsPerMsg
|
batchCount := len(partitions) / partitionsPerMsg
|
||||||
|
@ -275,6 +275,105 @@ func TestWDPostDoPost(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWDPostDoPost verifies that doPost will send the correct number of window
|
||||||
|
// PoST messages for a given number of partitions based on user config
|
||||||
|
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
||||||
|
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
||||||
|
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
|
||||||
|
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
|
||||||
|
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
|
||||||
|
ctx := context.Background()
|
||||||
|
expectedMsgCount := 364
|
||||||
|
|
||||||
|
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
|
||||||
|
postAct := tutils.NewIDAddr(t, 100)
|
||||||
|
|
||||||
|
mockStgMinerAPI := newMockStorageMinerAPI()
|
||||||
|
|
||||||
|
// Get the number of sectors allowed in a partition for this proof type
|
||||||
|
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// Work out the number of partitions that can be included in a message
|
||||||
|
// without exceeding the message sector limit
|
||||||
|
|
||||||
|
//stm: @BLOCKCHAIN_POLICY_GET_MAX_POST_PARTITIONS_001
|
||||||
|
partitionsPerMsg, err := policy.GetMaxPoStPartitions(network.Version13, proofType)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if partitionsPerMsg > minertypes.AddressedPartitionsMax {
|
||||||
|
partitionsPerMsg = minertypes.AddressedPartitionsMax
|
||||||
|
}
|
||||||
|
|
||||||
|
partitionCount := 4 * partitionsPerMsg
|
||||||
|
|
||||||
|
// Assert that user config is less than network limit
|
||||||
|
userPartLimit := 33
|
||||||
|
lastMsgParts := 21
|
||||||
|
require.Greater(t, partitionCount, userPartLimit)
|
||||||
|
|
||||||
|
// Assert that we consts are correct
|
||||||
|
require.Equal(t, (expectedMsgCount-1)*userPartLimit+lastMsgParts, 4*partitionsPerMsg)
|
||||||
|
|
||||||
|
var partitions []api.Partition
|
||||||
|
for p := 0; p < partitionCount; p++ {
|
||||||
|
sectors := bitfield.New()
|
||||||
|
for s := uint64(0); s < sectorsPerPartition; s++ {
|
||||||
|
sectors.Set(s)
|
||||||
|
}
|
||||||
|
partitions = append(partitions, api.Partition{
|
||||||
|
AllSectors: sectors,
|
||||||
|
FaultySectors: bitfield.New(),
|
||||||
|
RecoveringSectors: bitfield.New(),
|
||||||
|
LiveSectors: sectors,
|
||||||
|
ActiveSectors: sectors,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
mockStgMinerAPI.setPartitions(partitions)
|
||||||
|
|
||||||
|
// Run window PoST
|
||||||
|
scheduler := &WindowPoStScheduler{
|
||||||
|
api: mockStgMinerAPI,
|
||||||
|
prover: &mockProver{},
|
||||||
|
verifier: &mockVerif{},
|
||||||
|
faultTracker: &mockFaultTracker{},
|
||||||
|
proofType: proofType,
|
||||||
|
actor: postAct,
|
||||||
|
journal: journal.NilJournal(),
|
||||||
|
addrSel: &ctladdr.AddressSelector{},
|
||||||
|
|
||||||
|
maxPartitionsPerMessage: userPartLimit,
|
||||||
|
}
|
||||||
|
|
||||||
|
di := &dline.Info{
|
||||||
|
WPoStPeriodDeadlines: minertypes.WPoStPeriodDeadlines,
|
||||||
|
WPoStProvingPeriod: minertypes.WPoStProvingPeriod,
|
||||||
|
WPoStChallengeWindow: minertypes.WPoStChallengeWindow,
|
||||||
|
WPoStChallengeLookback: minertypes.WPoStChallengeLookback,
|
||||||
|
FaultDeclarationCutoff: minertypes.FaultDeclarationCutoff,
|
||||||
|
}
|
||||||
|
ts := mockTipSet(t)
|
||||||
|
|
||||||
|
scheduler.startGeneratePoST(ctx, ts, di, func(posts []minertypes.SubmitWindowedPoStParams, err error) {
|
||||||
|
scheduler.startSubmitPoST(ctx, ts, di, posts, func(err error) {})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Read the window PoST messages
|
||||||
|
for i := 0; i < expectedMsgCount; i++ {
|
||||||
|
msg := <-mockStgMinerAPI.pushedMessages
|
||||||
|
require.Equal(t, builtin.MethodsMiner.SubmitWindowedPoSt, msg.Method)
|
||||||
|
var params minertypes.SubmitWindowedPoStParams
|
||||||
|
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if i == expectedMsgCount-1 {
|
||||||
|
// In the last message we only included a 21 partitions
|
||||||
|
require.Len(t, params.Partitions, lastMsgParts)
|
||||||
|
} else {
|
||||||
|
// All previous messages should include the full number of partitions
|
||||||
|
require.Len(t, params.Partitions, userPartLimit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mockTipSet(t *testing.T) *types.TipSet {
|
func mockTipSet(t *testing.T) *types.TipSet {
|
||||||
minerAct := tutils.NewActorAddr(t, "miner")
|
minerAct := tutils.NewActorAddr(t, "miner")
|
||||||
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")
|
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")
|
||||||
|
@ -38,6 +38,7 @@ type WindowPoStScheduler struct {
|
|||||||
faultTracker sectorstorage.FaultTracker
|
faultTracker sectorstorage.FaultTracker
|
||||||
proofType abi.RegisteredPoStProof
|
proofType abi.RegisteredPoStProof
|
||||||
partitionSectors uint64
|
partitionSectors uint64
|
||||||
|
maxPartitionsPerMessage int
|
||||||
ch *changeHandler
|
ch *changeHandler
|
||||||
|
|
||||||
actor address.Address
|
actor address.Address
|
||||||
@ -52,6 +53,7 @@ type WindowPoStScheduler struct {
|
|||||||
// NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
|
// NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
|
||||||
func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
|
func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
|
||||||
cfg config.MinerFeeConfig,
|
cfg config.MinerFeeConfig,
|
||||||
|
pcfg config.ProvingConfig,
|
||||||
as *AddressSelector,
|
as *AddressSelector,
|
||||||
sp storage.Prover,
|
sp storage.Prover,
|
||||||
verif ffiwrapper.Verifier,
|
verif ffiwrapper.Verifier,
|
||||||
@ -72,6 +74,7 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
|
|||||||
faultTracker: ft,
|
faultTracker: ft,
|
||||||
proofType: mi.WindowPoStProofType,
|
proofType: mi.WindowPoStProofType,
|
||||||
partitionSectors: mi.WindowPoStPartitionSectors,
|
partitionSectors: mi.WindowPoStPartitionSectors,
|
||||||
|
maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage,
|
||||||
|
|
||||||
actor: actor,
|
actor: actor,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
|
Loading…
Reference in New Issue
Block a user