Merge pull request #8986 from filecoin-project/asr/feat/dfr-maxpart-config
feat: recovery: Config for maximum partition count per message
This commit is contained in:
commit
fb0fe6bc51
@ -390,8 +390,18 @@
|
|||||||
# Setting this value above the network limit has no effect
|
# Setting this value above the network limit has no effect
|
||||||
#
|
#
|
||||||
# type: int
|
# type: int
|
||||||
# env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE
|
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
|
||||||
#MaxPartitionsPerMessage = 0
|
#MaxPartitionsPerPoStMessage = 0
|
||||||
|
|
||||||
|
# In some cases when submitting DeclareFaultsRecovered messages,
|
||||||
|
# there may be too many recoveries to fit in a BlockGasLimit.
|
||||||
|
# In those cases it may be necessary to set this value to something low (eg 1);
|
||||||
|
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||||
|
# resulting in more total gas use (but each message will have lower gas limit)
|
||||||
|
#
|
||||||
|
# type: int
|
||||||
|
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
|
||||||
|
#MaxPartitionsPerRecoveryMessage = 0
|
||||||
|
|
||||||
|
|
||||||
[Sealing]
|
[Sealing]
|
||||||
|
@ -690,7 +690,7 @@ After changing this option, confirm that the new value works in your setup by in
|
|||||||
'lotus-miner proving compute window-post 0'`,
|
'lotus-miner proving compute window-post 0'`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "MaxPartitionsPerMessage",
|
Name: "MaxPartitionsPerPoStMessage",
|
||||||
Type: "int",
|
Type: "int",
|
||||||
|
|
||||||
Comment: `Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
|
Comment: `Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
|
||||||
@ -707,6 +707,16 @@ to prove each deadline, resulting in more total gas use (but each message will h
|
|||||||
|
|
||||||
Setting this value above the network limit has no effect`,
|
Setting this value above the network limit has no effect`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "MaxPartitionsPerRecoveryMessage",
|
||||||
|
Type: "int",
|
||||||
|
|
||||||
|
Comment: `In some cases when submitting DeclareFaultsRecovered messages,
|
||||||
|
there may be too many recoveries to fit in a BlockGasLimit.
|
||||||
|
In those cases it may be necessary to set this value to something low (eg 1);
|
||||||
|
Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||||
|
resulting in more total gas use (but each message will have lower gas limit)`,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"Pubsub": []DocField{
|
"Pubsub": []DocField{
|
||||||
{
|
{
|
||||||
|
@ -284,7 +284,16 @@ type ProvingConfig struct {
|
|||||||
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
|
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
|
||||||
//
|
//
|
||||||
// Setting this value above the network limit has no effect
|
// Setting this value above the network limit has no effect
|
||||||
MaxPartitionsPerMessage int
|
MaxPartitionsPerPoStMessage int
|
||||||
|
|
||||||
|
// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.
|
||||||
|
|
||||||
|
// In some cases when submitting DeclareFaultsRecovered messages,
|
||||||
|
// there may be too many recoveries to fit in a BlockGasLimit.
|
||||||
|
// In those cases it may be necessary to set this value to something low (eg 1);
|
||||||
|
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||||
|
// resulting in more total gas use (but each message will have lower gas limit)
|
||||||
|
MaxPartitionsPerRecoveryMessage int
|
||||||
}
|
}
|
||||||
|
|
||||||
type SealingConfig struct {
|
type SealingConfig struct {
|
||||||
|
64
storage/paths/gomock_reflect_3544667724/prog.go
Normal file
64
storage/paths/gomock_reflect_3544667724/prog.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/gob"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/golang/mock/mockgen/model"
|
||||||
|
|
||||||
|
pkg_ "github.com/filecoin-project/lotus/storage/paths"
|
||||||
|
)
|
||||||
|
|
||||||
|
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
its := []struct {
|
||||||
|
sym string
|
||||||
|
typ reflect.Type
|
||||||
|
}{
|
||||||
|
|
||||||
|
{"Store", reflect.TypeOf((*pkg_.Store)(nil)).Elem()},
|
||||||
|
}
|
||||||
|
pkg := &model.Package{
|
||||||
|
// NOTE: This behaves contrary to documented behaviour if the
|
||||||
|
// package name is not the final component of the import path.
|
||||||
|
// The reflect package doesn't expose the package name, though.
|
||||||
|
Name: path.Base("github.com/filecoin-project/lotus/storage/paths"),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, it := range its {
|
||||||
|
intf, err := model.InterfaceFromInterfaceType(it.typ)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
intf.Name = it.sym
|
||||||
|
pkg.Interfaces = append(pkg.Interfaces, intf)
|
||||||
|
}
|
||||||
|
|
||||||
|
outfile := os.Stdout
|
||||||
|
if len(*output) != 0 {
|
||||||
|
var err error
|
||||||
|
outfile, err = os.Create(*output)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := outfile.Close(); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
@ -518,9 +518,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
|
|||||||
}
|
}
|
||||||
|
|
||||||
// respect user config if set
|
// respect user config if set
|
||||||
if s.maxPartitionsPerMessage > 0 {
|
if s.maxPartitionsPerPostMessage > 0 {
|
||||||
if partitionsPerMsg > s.maxPartitionsPerMessage {
|
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
|
||||||
partitionsPerMsg = s.maxPartitionsPerMessage
|
partitionsPerMsg = s.maxPartitionsPerPostMessage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,14 +36,15 @@ import (
|
|||||||
// TODO: the waiting should happen in the background. Right now this
|
// TODO: the waiting should happen in the background. Right now this
|
||||||
// is blocking/delaying the actual generation and submission of WindowPoSts in
|
// is blocking/delaying the actual generation and submission of WindowPoSts in
|
||||||
// this deadline!
|
// this deadline!
|
||||||
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
|
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
|
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
faulty := uint64(0)
|
faulty := uint64(0)
|
||||||
params := &miner.DeclareFaultsRecoveredParams{
|
|
||||||
Recoveries: []miner.RecoveryDeclaration{},
|
var batchedRecoveryDecls [][]miner.RecoveryDeclaration
|
||||||
}
|
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
|
||||||
|
totalRecoveries := 0
|
||||||
|
|
||||||
for partIdx, partition := range partitions {
|
for partIdx, partition := range partitions {
|
||||||
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
|
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
|
||||||
@ -77,25 +78,38 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{
|
// respect user config if set
|
||||||
|
if s.maxPartitionsPerRecoveryMessage > 0 &&
|
||||||
|
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
|
||||||
|
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
|
||||||
|
}
|
||||||
|
|
||||||
|
batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
|
||||||
Deadline: dlIdx,
|
Deadline: dlIdx,
|
||||||
Partition: uint64(partIdx),
|
Partition: uint64(partIdx),
|
||||||
Sectors: recovered,
|
Sectors: recovered,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
totalRecoveries++
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveries := params.Recoveries
|
if totalRecoveries == 0 {
|
||||||
if len(recoveries) == 0 {
|
|
||||||
if faulty != 0 {
|
if faulty != 0 {
|
||||||
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
|
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
|
||||||
}
|
}
|
||||||
|
|
||||||
return recoveries, nil, nil
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var msgs []*types.SignedMessage
|
||||||
|
for _, recovery := range batchedRecoveryDecls {
|
||||||
|
params := &miner.DeclareFaultsRecoveredParams{
|
||||||
|
Recoveries: recovery,
|
||||||
}
|
}
|
||||||
|
|
||||||
enc, aerr := actors.SerializeParams(params)
|
enc, aerr := actors.SerializeParams(params)
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
@ -106,26 +120,30 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
|
|||||||
}
|
}
|
||||||
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
|
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
|
||||||
if err := s.prepareMessage(ctx, msg, spec); err != nil {
|
if err := s.prepareMessage(ctx, msg, spec); err != nil {
|
||||||
return recoveries, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
|
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
|
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
||||||
|
msgs = append(msgs, sm)
|
||||||
|
}
|
||||||
|
|
||||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
|
for _, msg := range msgs {
|
||||||
|
rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
|
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rec.Receipt.ExitCode != 0 {
|
if rec.Receipt.ExitCode != 0 {
|
||||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return recoveries, sm, nil
|
return batchedRecoveryDecls, msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// declareFaults identifies the sectors on the specified proving deadline that
|
// declareFaults identifies the sectors on the specified proving deadline that
|
||||||
@ -241,9 +259,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sigmsg *types.SignedMessage
|
sigmsgs []*types.SignedMessage
|
||||||
recoveries []miner.RecoveryDeclaration
|
recoveries [][]miner.RecoveryDeclaration
|
||||||
faults []miner.FaultDeclaration
|
|
||||||
|
|
||||||
// optionalCid returns the CID of the message, or cid.Undef is the
|
// optionalCid returns the CID of the message, or cid.Undef is the
|
||||||
// message is nil. We don't need the argument (could capture the
|
// message is nil. We don't need the argument (could capture the
|
||||||
@ -256,36 +273,27 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
||||||
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
|
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
|
||||||
log.Errorf("checking sector recoveries: %v", err)
|
log.Errorf("checking sector recoveries: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// should always be true, skip journaling if not for some reason
|
||||||
|
if len(recoveries) == len(sigmsgs) {
|
||||||
|
for i, recovery := range recoveries {
|
||||||
|
// clone for function literal
|
||||||
|
recovery := recovery
|
||||||
|
msgCID := optionalCid(sigmsgs[i])
|
||||||
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
||||||
j := WdPoStRecoveriesProcessedEvt{
|
j := WdPoStRecoveriesProcessedEvt{
|
||||||
evtCommon: s.getEvtCommon(err),
|
evtCommon: s.getEvtCommon(err),
|
||||||
Declarations: recoveries,
|
Declarations: recovery,
|
||||||
MessageCID: optionalCid(sigmsg),
|
MessageCID: msgCID,
|
||||||
}
|
}
|
||||||
j.Error = err
|
j.Error = err
|
||||||
return j
|
return j
|
||||||
})
|
})
|
||||||
|
|
||||||
if ts.Height() > build.UpgradeIgnitionHeight {
|
|
||||||
return // FORK: declaring faults after ignition upgrade makes no sense
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
|
||||||
// TODO: This is also potentially really bad, but we try to post anyways
|
|
||||||
log.Errorf("checking sector faults: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
|
|
||||||
return WdPoStFaultsProcessedEvt{
|
|
||||||
evtCommon: s.getEvtCommon(err),
|
|
||||||
Declarations: faults,
|
|
||||||
MessageCID: optionalCid(sigmsg),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -268,7 +268,7 @@ func TestWDPostDoPost(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWDPostDoPost verifies that doPost will send the correct number of window
|
// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window
|
||||||
// PoST messages for a given number of partitions based on user config
|
// PoST messages for a given number of partitions based on user config
|
||||||
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
||||||
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
||||||
@ -333,7 +333,7 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
|||||||
journal: journal.NilJournal(),
|
journal: journal.NilJournal(),
|
||||||
addrSel: &ctladdr.AddressSelector{},
|
addrSel: &ctladdr.AddressSelector{},
|
||||||
|
|
||||||
maxPartitionsPerMessage: userPartLimit,
|
maxPartitionsPerPostMessage: userPartLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
di := &dline.Info{
|
di := &dline.Info{
|
||||||
@ -367,6 +367,99 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of
|
||||||
|
// DeclareFaultsRecovered messages for a given number of partitions based on user config
|
||||||
|
func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) {
|
||||||
|
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
||||||
|
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
|
||||||
|
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
|
||||||
|
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
|
||||||
|
postAct := tutils.NewIDAddr(t, 100)
|
||||||
|
|
||||||
|
mockStgMinerAPI := newMockStorageMinerAPI()
|
||||||
|
|
||||||
|
// Get the number of sectors allowed in a partition for this proof type
|
||||||
|
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message
|
||||||
|
userPartLimit := 3
|
||||||
|
partitionCount := 20
|
||||||
|
faultyPartitionCount := 11
|
||||||
|
|
||||||
|
var partitions []api.Partition
|
||||||
|
for p := 0; p < partitionCount; p++ {
|
||||||
|
sectors := bitfield.New()
|
||||||
|
for s := uint64(0); s < sectorsPerPartition; s++ {
|
||||||
|
sectors.Set(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
partition := api.Partition{
|
||||||
|
AllSectors: sectors,
|
||||||
|
FaultySectors: bitfield.New(),
|
||||||
|
RecoveringSectors: bitfield.New(),
|
||||||
|
LiveSectors: sectors,
|
||||||
|
ActiveSectors: sectors,
|
||||||
|
}
|
||||||
|
|
||||||
|
if p < faultyPartitionCount {
|
||||||
|
partition.FaultySectors = sectors
|
||||||
|
}
|
||||||
|
|
||||||
|
partitions = append(partitions, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
mockStgMinerAPI.setPartitions(partitions)
|
||||||
|
|
||||||
|
// Run declareRecoverios
|
||||||
|
scheduler := &WindowPoStScheduler{
|
||||||
|
api: mockStgMinerAPI,
|
||||||
|
prover: &mockProver{},
|
||||||
|
verifier: &mockVerif{},
|
||||||
|
faultTracker: &mockFaultTracker{},
|
||||||
|
proofType: proofType,
|
||||||
|
actor: postAct,
|
||||||
|
journal: journal.NilJournal(),
|
||||||
|
addrSel: &ctladdr.AddressSelector{},
|
||||||
|
|
||||||
|
maxPartitionsPerRecoveryMessage: userPartLimit,
|
||||||
|
}
|
||||||
|
|
||||||
|
di := uint64(0)
|
||||||
|
ts := mockTipSet(t)
|
||||||
|
|
||||||
|
expectedMsgCount := faultyPartitionCount/userPartLimit + 1
|
||||||
|
lastMsgParts := faultyPartitionCount % userPartLimit
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key())
|
||||||
|
require.NoError(t, err, "failed to declare recoveries")
|
||||||
|
require.Equal(t, len(batchedRecoveries), len(msgs))
|
||||||
|
require.Equal(t, expectedMsgCount, len(msgs))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Read the window PoST messages
|
||||||
|
for i := 0; i < expectedMsgCount; i++ {
|
||||||
|
msg := <-mockStgMinerAPI.pushedMessages
|
||||||
|
require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method)
|
||||||
|
var params minertypes.DeclareFaultsRecoveredParams
|
||||||
|
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if i == expectedMsgCount-1 {
|
||||||
|
// In the last message we only included a 21 partitions
|
||||||
|
require.Len(t, params.Recoveries, lastMsgParts)
|
||||||
|
} else {
|
||||||
|
// All previous messages should include the full number of partitions
|
||||||
|
require.Len(t, params.Recoveries, userPartLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mockTipSet(t *testing.T) *types.TipSet {
|
func mockTipSet(t *testing.T) *types.TipSet {
|
||||||
minerAct := tutils.NewActorAddr(t, "miner")
|
minerAct := tutils.NewActorAddr(t, "miner")
|
||||||
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")
|
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")
|
||||||
|
@ -71,7 +71,8 @@ type WindowPoStScheduler struct {
|
|||||||
proofType abi.RegisteredPoStProof
|
proofType abi.RegisteredPoStProof
|
||||||
partitionSectors uint64
|
partitionSectors uint64
|
||||||
disablePreChecks bool
|
disablePreChecks bool
|
||||||
maxPartitionsPerMessage int
|
maxPartitionsPerPostMessage int
|
||||||
|
maxPartitionsPerRecoveryMessage int
|
||||||
ch *changeHandler
|
ch *changeHandler
|
||||||
|
|
||||||
actor address.Address
|
actor address.Address
|
||||||
@ -108,7 +109,8 @@ func NewWindowedPoStScheduler(api NodeAPI,
|
|||||||
proofType: mi.WindowPoStProofType,
|
proofType: mi.WindowPoStProofType,
|
||||||
partitionSectors: mi.WindowPoStPartitionSectors,
|
partitionSectors: mi.WindowPoStPartitionSectors,
|
||||||
disablePreChecks: pcfg.DisableWDPoStPreChecks,
|
disablePreChecks: pcfg.DisableWDPoStPreChecks,
|
||||||
maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage,
|
maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage,
|
||||||
|
maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage,
|
||||||
|
|
||||||
actor: actor,
|
actor: actor,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
|
Loading…
Reference in New Issue
Block a user