Merge pull request #8992 from filecoin-project/release/v1.16.1

build: release v1.16.1
This commit is contained in:
Jiaying Wang 2022-07-07 16:32:41 -04:00 committed by GitHub
commit a2fa07d355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 358 additions and 83 deletions

View File

@ -1,7 +1,17 @@
# Lotus changelog # Lotus changelog
# 1.16.0 / 2022-06-24 # 1.16.1 / 2022-07-07
This is an OPTIONAL PATCH releases for storage providers who have failed to publish `SubmitWindowedPoSt` due to out of gas error. The error log looks like `/wdpost_run.go:xxx estimating gas {"error": "estimating gas used: message execution failed: exit SysErrOutOfGas(7)...`.
## New Features
- feat: declare fault recovery: Config for maximum partition count per message (#8988 / #8986)
- configure `MaxPartitionsPerRecoveryMessage` in miner configuration setting.
- feat: wdpost: Config for maximum partition count per message (#8982 / #8986)
- configure `MaxPartitionsPerPoStMessage` in miner configuration setting.
# 1.16.0 / 2022-06-24
This is a MANDATORY release of Lotus that introduces [Filecoin network v16, This is a MANDATORY release of Lotus that introduces [Filecoin network v16,
codenamed the Skyr upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-2392151). codenamed the Skyr upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-2392151).

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -37,7 +37,7 @@ func BuildTypeString() string {
} }
// BuildVersion is the local build version // BuildVersion is the local build version
const BuildVersion = "1.16.0" const BuildVersion = "1.16.1"
func UserVersion() string { func UserVersion() string {
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" { if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {

View File

@ -7,7 +7,7 @@ USAGE:
lotus-miner [global options] command [command options] [arguments...] lotus-miner [global options] command [command options] [arguments...]
VERSION: VERSION:
1.16.0 1.16.1
COMMANDS: COMMANDS:
init Initialize a lotus miner repo init Initialize a lotus miner repo

View File

@ -7,7 +7,7 @@ USAGE:
lotus-worker [global options] command [command options] [arguments...] lotus-worker [global options] command [command options] [arguments...]
VERSION: VERSION:
1.16.0 1.16.1
COMMANDS: COMMANDS:
run Start lotus worker run Start lotus worker

View File

@ -7,7 +7,7 @@ USAGE:
lotus [global options] command [command options] [arguments...] lotus [global options] command [command options] [arguments...]
VERSION: VERSION:
1.16.0 1.16.1
COMMANDS: COMMANDS:
daemon Start a lotus daemon process daemon Start a lotus daemon process

View File

@ -315,6 +315,22 @@
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT # env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 128 #ParallelCheckLimit = 128
# Setting this value above the network limit has no effect
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
#MaxPartitionsPerPoStMessage = 0
# In some cases when submitting DeclareFaultsRecovered messages,
# there may be too many recoveries to fit in a BlockGasLimit.
# In those cases it may be necessary to set this value to something low (eg 1);
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
# resulting in more total gas use (but each message will have lower gas limit)
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
#MaxPartitionsPerRecoveryMessage = 0
[Sealing] [Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. # Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.

View File

@ -116,7 +116,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(*miner.Miner), modules.SetupBlockProducer), Override(new(*miner.Miner), modules.SetupBlockProducer),
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees)), Override(new(*storage.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)),
Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))), Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))),
), ),

View File

@ -629,6 +629,22 @@ over the worker address if this flag is set.`,
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`, Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
}, },
{
Name: "MaxPartitionsPerPoStMessage",
Type: "int",
Comment: `Setting this value above the network limit has no effect`,
},
{
Name: "MaxPartitionsPerRecoveryMessage",
Type: "int",
Comment: `In some cases when submitting DeclareFaultsRecovered messages,
there may be too many recoveries to fit in a BlockGasLimit.
In those cases it may be necessary to set this value to something low (eg 1);
Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
resulting in more total gas use (but each message will have lower gas limit)`,
},
}, },
"Pubsub": []DocField{ "Pubsub": []DocField{
{ {

View File

@ -222,6 +222,29 @@ type ProvingConfig struct {
// Maximum number of sector checks to run in parallel. (0 = unlimited) // Maximum number of sector checks to run in parallel. (0 = unlimited)
ParallelCheckLimit int ParallelCheckLimit int
// Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
//
// A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors.
//
// The maximum number of sectors which can be proven in a single PoSt message is 25000 in network version 16, which
// means that a single message can prove at most 10 partinions
//
// In some cases when submitting PoSt messages which are recovering sectors, the default network limit may still be
// too high to fit in the block gas limit; In those cases it may be necessary to set this value to something lower
// than 10; Note that setting this value lower may result in less efficient gas use - more messages will be sent,
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
//
// Setting this value above the network limit has no effect
MaxPartitionsPerPoStMessage int
// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.
// In some cases when submitting DeclareFaultsRecovered messages,
// there may be too many recoveries to fit in a BlockGasLimit.
// In those cases it may be necessary to set this value to something low (eg 1);
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
// resulting in more total gas use (but each message will have lower gas limit)
MaxPartitionsPerRecoveryMessage int
// todo disable builtin post // todo disable builtin post
} }

View File

@ -256,7 +256,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
} }
} }
func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) { func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) { return func(params StorageMinerParams) (*storage.WindowPoStScheduler, error) {
var ( var (
mctx = params.MetricsCtx mctx = params.MetricsCtx
@ -271,7 +271,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig) func(params StorageMinerParam
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, verif, sealer, j, maddr) fps, err := storage.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -261,14 +261,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
// TODO: the waiting should happen in the background. Right now this // TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in // is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline! // this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries") ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End() defer span.End()
faulty := uint64(0) faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{}, var batchedRecoveryDecls [][]miner.RecoveryDeclaration
} batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
totalRecoveries := 0
for partIdx, partition := range partitions { for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
@ -302,55 +303,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
continue continue
} }
params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{ // respect user config if set
if s.maxPartitionsPerRecoveryMessage > 0 &&
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
}
batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
Deadline: dlIdx, Deadline: dlIdx,
Partition: uint64(partIdx), Partition: uint64(partIdx),
Sectors: recovered, Sectors: recovered,
}) })
totalRecoveries++
} }
recoveries := params.Recoveries if totalRecoveries == 0 {
if len(recoveries) == 0 {
if faulty != 0 { if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty) log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
} }
return recoveries, nil, nil return nil, nil, nil
} }
enc, aerr := actors.SerializeParams(params) var msgs []*types.SignedMessage
if aerr != nil { for _, recovery := range batchedRecoveryDecls {
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) params := &miner.DeclareFaultsRecoveredParams{
Recoveries: recovery,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, nil, err
}
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
msgs = append(msgs, sm)
} }
msg := &types.Message{ for _, msg := range msgs {
To: s.actor, rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
Method: builtin.MethodsMiner.DeclareFaultsRecovered, if err != nil {
Params: enc, return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
Value: types.NewInt(0), }
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} if rec.Receipt.ExitCode != 0 {
if err := s.prepareMessage(ctx, msg, spec); err != nil { return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
return recoveries, nil, err }
} }
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) return batchedRecoveryDecls, msgs, nil
if err != nil {
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return recoveries, sm, nil
} }
// declareFaults identifies the sectors on the specified proving deadline that // declareFaults identifies the sectors on the specified proving deadline that
@ -464,9 +482,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
} }
var ( var (
sigmsg *types.SignedMessage sigmsgs []*types.SignedMessage
recoveries []miner.RecoveryDeclaration recoveries [][]miner.RecoveryDeclaration
faults []miner.FaultDeclaration
// optionalCid returns the CID of the message, or cid.Undef is the // optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the // message is nil. We don't need the argument (could capture the
@ -479,37 +496,28 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
} }
) )
if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err) log.Errorf("checking sector recoveries: %v", err)
} }
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { // should always be true, skip journaling if not for some reason
j := WdPoStRecoveriesProcessedEvt{ if len(recoveries) == len(sigmsgs) {
evtCommon: s.getEvtCommon(err), for i, recovery := range recoveries {
Declarations: recoveries, // clone for function literal
MessageCID: optionalCid(sigmsg), recovery := recovery
msgCID := optionalCid(sigmsgs[i])
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recovery,
MessageCID: msgCID,
}
j.Error = err
return j
})
} }
j.Error = err
return j
})
if ts.Height() > build.UpgradeIgnitionHeight {
return // FORK: declaring faults after ignition upgrade makes no sense
} }
if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,
MessageCID: optionalCid(sigmsg),
}
})
}() }()
} }
@ -778,6 +786,13 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
partitionsPerMsg = declMax partitionsPerMsg = declMax
} }
// respect user config if set
if s.maxPartitionsPerPostMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
partitionsPerMsg = s.maxPartitionsPerPostMessage
}
}
// The number of messages will be: // The number of messages will be:
// ceiling(number of partitions / partitions per message) // ceiling(number of partitions / partitions per message)
batchCount := len(partitions) / partitionsPerMsg batchCount := len(partitions) / partitionsPerMsg

View File

@ -275,6 +275,196 @@ func TestWDPostDoPost(t *testing.T) {
} }
} }
// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window
// PoST messages for a given number of partitions based on user config
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
ctx := context.Background()
expectedMsgCount := 364
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
postAct := tutils.NewIDAddr(t, 100)
mockStgMinerAPI := newMockStorageMinerAPI()
// Get the number of sectors allowed in a partition for this proof type
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
require.NoError(t, err)
// Work out the number of partitions that can be included in a message
// without exceeding the message sector limit
//stm: @BLOCKCHAIN_POLICY_GET_MAX_POST_PARTITIONS_001
partitionsPerMsg, err := policy.GetMaxPoStPartitions(network.Version13, proofType)
require.NoError(t, err)
if partitionsPerMsg > minertypes.AddressedPartitionsMax {
partitionsPerMsg = minertypes.AddressedPartitionsMax
}
partitionCount := 4 * partitionsPerMsg
// Assert that user config is less than network limit
userPartLimit := 33
lastMsgParts := 21
require.Greater(t, partitionCount, userPartLimit)
// Assert that we consts are correct
require.Equal(t, (expectedMsgCount-1)*userPartLimit+lastMsgParts, 4*partitionsPerMsg)
var partitions []api.Partition
for p := 0; p < partitionCount; p++ {
sectors := bitfield.New()
for s := uint64(0); s < sectorsPerPartition; s++ {
sectors.Set(s)
}
partitions = append(partitions, api.Partition{
AllSectors: sectors,
FaultySectors: bitfield.New(),
RecoveringSectors: bitfield.New(),
LiveSectors: sectors,
ActiveSectors: sectors,
})
}
mockStgMinerAPI.setPartitions(partitions)
// Run window PoST
scheduler := &WindowPoStScheduler{
api: mockStgMinerAPI,
prover: &mockProver{},
verifier: &mockVerif{},
faultTracker: &mockFaultTracker{},
proofType: proofType,
actor: postAct,
journal: journal.NilJournal(),
maxPartitionsPerPostMessage: userPartLimit,
}
di := &dline.Info{
WPoStPeriodDeadlines: minertypes.WPoStPeriodDeadlines,
WPoStProvingPeriod: minertypes.WPoStProvingPeriod,
WPoStChallengeWindow: minertypes.WPoStChallengeWindow,
WPoStChallengeLookback: minertypes.WPoStChallengeLookback,
FaultDeclarationCutoff: minertypes.FaultDeclarationCutoff,
}
ts := mockTipSet(t)
scheduler.startGeneratePoST(ctx, ts, di, func(posts []minertypes.SubmitWindowedPoStParams, err error) {
scheduler.startSubmitPoST(ctx, ts, di, posts, func(err error) {})
})
// Read the window PoST messages
for i := 0; i < expectedMsgCount; i++ {
msg := <-mockStgMinerAPI.pushedMessages
require.Equal(t, builtin.MethodsMiner.SubmitWindowedPoSt, msg.Method)
var params minertypes.SubmitWindowedPoStParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
if i == expectedMsgCount-1 {
// In the last message we only included a 21 partitions
require.Len(t, params.Partitions, lastMsgParts)
} else {
// All previous messages should include the full number of partitions
require.Len(t, params.Partitions, userPartLimit)
}
}
}
// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of
// DeclareFaultsRecovered messages for a given number of partitions based on user config
func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) {
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
ctx := context.Background()
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
postAct := tutils.NewIDAddr(t, 100)
mockStgMinerAPI := newMockStorageMinerAPI()
// Get the number of sectors allowed in a partition for this proof type
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
require.NoError(t, err)
// Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message
userPartLimit := 3
partitionCount := 20
faultyPartitionCount := 11
var partitions []api.Partition
for p := 0; p < partitionCount; p++ {
sectors := bitfield.New()
for s := uint64(0); s < sectorsPerPartition; s++ {
sectors.Set(s)
}
partition := api.Partition{
AllSectors: sectors,
FaultySectors: bitfield.New(),
RecoveringSectors: bitfield.New(),
LiveSectors: sectors,
ActiveSectors: sectors,
}
if p < faultyPartitionCount {
partition.FaultySectors = sectors
}
partitions = append(partitions, partition)
}
mockStgMinerAPI.setPartitions(partitions)
// Run declareRecoverios
scheduler := &WindowPoStScheduler{
api: mockStgMinerAPI,
prover: &mockProver{},
verifier: &mockVerif{},
faultTracker: &mockFaultTracker{},
proofType: proofType,
actor: postAct,
journal: journal.NilJournal(),
maxPartitionsPerRecoveryMessage: userPartLimit,
}
di := uint64(0)
ts := mockTipSet(t)
expectedMsgCount := faultyPartitionCount/userPartLimit + 1
lastMsgParts := faultyPartitionCount % userPartLimit
go func() {
batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key())
require.NoError(t, err, "failed to declare recoveries")
require.Equal(t, len(batchedRecoveries), len(msgs))
require.Equal(t, expectedMsgCount, len(msgs))
}()
// Read the window PoST messages
for i := 0; i < expectedMsgCount; i++ {
msg := <-mockStgMinerAPI.pushedMessages
require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method)
var params minertypes.DeclareFaultsRecoveredParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
if i == expectedMsgCount-1 {
// In the last message we only included a 21 partitions
require.Len(t, params.Recoveries, lastMsgParts)
} else {
// All previous messages should include the full number of partitions
require.Len(t, params.Recoveries, userPartLimit)
}
}
}
func mockTipSet(t *testing.T) *types.TipSet { func mockTipSet(t *testing.T) *types.TipSet {
minerAct := tutils.NewActorAddr(t, "miner") minerAct := tutils.NewActorAddr(t, "miner")
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")

View File

@ -30,15 +30,17 @@ import (
// WindowPoStScheduler watches the chain though the changeHandler, which in turn // WindowPoStScheduler watches the chain though the changeHandler, which in turn
// turn calls the scheduler when the time arrives to do work. // turn calls the scheduler when the time arrives to do work.
type WindowPoStScheduler struct { type WindowPoStScheduler struct {
api fullNodeFilteredAPI api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
addrSel *AddressSelector addrSel *AddressSelector
prover storage.Prover prover storage.Prover
verifier ffiwrapper.Verifier verifier ffiwrapper.Verifier
faultTracker sectorstorage.FaultTracker faultTracker sectorstorage.FaultTracker
proofType abi.RegisteredPoStProof proofType abi.RegisteredPoStProof
partitionSectors uint64 partitionSectors uint64
ch *changeHandler maxPartitionsPerPostMessage int
maxPartitionsPerRecoveryMessage int
ch *changeHandler
actor address.Address actor address.Address
@ -52,6 +54,7 @@ type WindowPoStScheduler struct {
// NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler. // NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
func NewWindowedPoStScheduler(api fullNodeFilteredAPI, func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
cfg config.MinerFeeConfig, cfg config.MinerFeeConfig,
pcfg config.ProvingConfig,
as *AddressSelector, as *AddressSelector,
sp storage.Prover, sp storage.Prover,
verif ffiwrapper.Verifier, verif ffiwrapper.Verifier,
@ -73,7 +76,9 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
proofType: mi.WindowPoStProofType, proofType: mi.WindowPoStProofType,
partitionSectors: mi.WindowPoStPartitionSectors, partitionSectors: mi.WindowPoStPartitionSectors,
actor: actor, maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage,
maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage,
actor: actor,
evtTypes: [...]journal.EventType{ evtTypes: [...]journal.EventType{
evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"), evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"), evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),