feat: recovery: Config for maximum partition count per message
This commit is contained in:
parent
673f9238be
commit
d68a60e8d9
@ -318,8 +318,18 @@
|
||||
# Setting this value above the network limit has no effect
|
||||
#
|
||||
# type: int
|
||||
# env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE
|
||||
#MaxPartitionsPerMessage = 0
|
||||
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
|
||||
#MaxPartitionsPerPoStMessage = 0
|
||||
|
||||
# In some cases when submitting DeclareFaultsRecovered messages,
|
||||
# there may be too many recoveries to fit in a BlockGasLimit.
|
||||
# In those cases it may be necessary to set this value to something low (eg 1);
|
||||
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||
# resulting in more total gas use (but each message will have lower gas limit)
|
||||
#
|
||||
# type: int
|
||||
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
|
||||
#MaxPartitionsPerRecoveryMessage = 0
|
||||
|
||||
|
||||
[Sealing]
|
||||
|
@ -630,11 +630,21 @@ over the worker address if this flag is set.`,
|
||||
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
|
||||
},
|
||||
{
|
||||
Name: "MaxPartitionsPerMessage",
|
||||
Name: "MaxPartitionsPerPoStMessage",
|
||||
Type: "int",
|
||||
|
||||
Comment: `Setting this value above the network limit has no effect`,
|
||||
},
|
||||
{
|
||||
Name: "MaxPartitionsPerRecoveryMessage",
|
||||
Type: "int",
|
||||
|
||||
Comment: `In some cases when submitting DeclareFaultsRecovered messages,
|
||||
there may be too many recoveries to fit in a BlockGasLimit.
|
||||
In those cases it may be necessary to set this value to something low (eg 1);
|
||||
Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||
resulting in more total gas use (but each message will have lower gas limit)`,
|
||||
},
|
||||
},
|
||||
"Pubsub": []DocField{
|
||||
{
|
||||
|
@ -222,7 +222,6 @@ type ProvingConfig struct {
|
||||
// Maximum number of sector checks to run in parallel. (0 = unlimited)
|
||||
ParallelCheckLimit int
|
||||
|
||||
// todo disable builtin post
|
||||
// Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
|
||||
//
|
||||
// A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors.
|
||||
@ -236,7 +235,17 @@ type ProvingConfig struct {
|
||||
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
|
||||
//
|
||||
// Setting this value above the network limit has no effect
|
||||
MaxPartitionsPerMessage int
|
||||
MaxPartitionsPerPoStMessage int
|
||||
|
||||
// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.
|
||||
|
||||
// In some cases when submitting DeclareFaultsRecovered messages,
|
||||
// there may be too many recoveries to fit in a BlockGasLimit.
|
||||
// In those cases it may be necessary to set this value to something low (eg 1);
|
||||
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
|
||||
// resulting in more total gas use (but each message will have lower gas limit)
|
||||
MaxPartitionsPerRecoveryMessage int
|
||||
// todo disable builtin post
|
||||
}
|
||||
|
||||
type SealingConfig struct {
|
||||
|
@ -261,14 +261,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
|
||||
// TODO: the waiting should happen in the background. Right now this
|
||||
// is blocking/delaying the actual generation and submission of WindowPoSts in
|
||||
// this deadline!
|
||||
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
|
||||
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
|
||||
defer span.End()
|
||||
|
||||
faulty := uint64(0)
|
||||
params := &miner.DeclareFaultsRecoveredParams{
|
||||
Recoveries: []miner.RecoveryDeclaration{},
|
||||
}
|
||||
|
||||
var batchedRecoveryDecls [][]miner.RecoveryDeclaration
|
||||
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
|
||||
totalRecoveries := 0
|
||||
|
||||
for partIdx, partition := range partitions {
|
||||
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
|
||||
@ -302,55 +303,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
|
||||
continue
|
||||
}
|
||||
|
||||
params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{
|
||||
// respect user config if set
|
||||
if s.maxPartitionsPerRecoveryMessage > 0 &&
|
||||
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
|
||||
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
|
||||
}
|
||||
|
||||
batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
|
||||
Deadline: dlIdx,
|
||||
Partition: uint64(partIdx),
|
||||
Sectors: recovered,
|
||||
})
|
||||
|
||||
totalRecoveries++
|
||||
}
|
||||
|
||||
recoveries := params.Recoveries
|
||||
if len(recoveries) == 0 {
|
||||
if totalRecoveries == 0 {
|
||||
if faulty != 0 {
|
||||
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
|
||||
}
|
||||
|
||||
return recoveries, nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||
var msgs []*types.SignedMessage
|
||||
for _, recovery := range batchedRecoveryDecls {
|
||||
params := &miner.DeclareFaultsRecoveredParams{
|
||||
Recoveries: recovery,
|
||||
}
|
||||
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
To: s.actor,
|
||||
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
|
||||
Params: enc,
|
||||
Value: types.NewInt(0),
|
||||
}
|
||||
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
|
||||
if err := s.prepareMessage(ctx, msg, spec); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
||||
msgs = append(msgs, sm)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
To: s.actor,
|
||||
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
|
||||
Params: enc,
|
||||
Value: types.NewInt(0),
|
||||
}
|
||||
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
|
||||
if err := s.prepareMessage(ctx, msg, spec); err != nil {
|
||||
return recoveries, nil, err
|
||||
for _, msg := range msgs {
|
||||
rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
|
||||
if err != nil {
|
||||
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||
}
|
||||
|
||||
if rec.Receipt.ExitCode != 0 {
|
||||
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
}
|
||||
}
|
||||
|
||||
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
|
||||
if err != nil {
|
||||
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
||||
|
||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
|
||||
if err != nil {
|
||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||
}
|
||||
|
||||
if rec.Receipt.ExitCode != 0 {
|
||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
return recoveries, sm, nil
|
||||
return batchedRecoveryDecls, msgs, nil
|
||||
}
|
||||
|
||||
// declareFaults identifies the sectors on the specified proving deadline that
|
||||
@ -464,9 +482,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
|
||||
}
|
||||
|
||||
var (
|
||||
sigmsg *types.SignedMessage
|
||||
recoveries []miner.RecoveryDeclaration
|
||||
faults []miner.FaultDeclaration
|
||||
sigmsgs []*types.SignedMessage
|
||||
recoveries [][]miner.RecoveryDeclaration
|
||||
|
||||
// optionalCid returns the CID of the message, or cid.Undef is the
|
||||
// message is nil. We don't need the argument (could capture the
|
||||
@ -479,37 +496,28 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
|
||||
}
|
||||
)
|
||||
|
||||
if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
||||
if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
||||
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
|
||||
log.Errorf("checking sector recoveries: %v", err)
|
||||
}
|
||||
|
||||
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
||||
j := WdPoStRecoveriesProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
Declarations: recoveries,
|
||||
MessageCID: optionalCid(sigmsg),
|
||||
// should always be true, skip journaling if not for some reason
|
||||
if len(recoveries) == len(sigmsgs) {
|
||||
for i, recovery := range recoveries {
|
||||
// clone for function literal
|
||||
recovery := recovery
|
||||
msgCID := optionalCid(sigmsgs[i])
|
||||
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
||||
j := WdPoStRecoveriesProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
Declarations: recovery,
|
||||
MessageCID: msgCID,
|
||||
}
|
||||
j.Error = err
|
||||
return j
|
||||
})
|
||||
}
|
||||
j.Error = err
|
||||
return j
|
||||
})
|
||||
|
||||
if ts.Height() > build.UpgradeIgnitionHeight {
|
||||
return // FORK: declaring faults after ignition upgrade makes no sense
|
||||
}
|
||||
|
||||
if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
|
||||
// TODO: This is also potentially really bad, but we try to post anyways
|
||||
log.Errorf("checking sector faults: %v", err)
|
||||
}
|
||||
|
||||
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
|
||||
return WdPoStFaultsProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
Declarations: faults,
|
||||
MessageCID: optionalCid(sigmsg),
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
@ -779,9 +787,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
|
||||
}
|
||||
|
||||
// respect user config if set
|
||||
if s.maxPartitionsPerMessage > 0 {
|
||||
if partitionsPerMsg > s.maxPartitionsPerMessage {
|
||||
partitionsPerMsg = s.maxPartitionsPerMessage
|
||||
if s.maxPartitionsPerPostMessage > 0 {
|
||||
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
|
||||
partitionsPerMsg = s.maxPartitionsPerPostMessage
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,7 +275,7 @@ func TestWDPostDoPost(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWDPostDoPost verifies that doPost will send the correct number of window
|
||||
// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window
|
||||
// PoST messages for a given number of partitions based on user config
|
||||
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
||||
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
||||
@ -338,9 +338,8 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
||||
proofType: proofType,
|
||||
actor: postAct,
|
||||
journal: journal.NilJournal(),
|
||||
addrSel: &ctladdr.AddressSelector{},
|
||||
|
||||
maxPartitionsPerMessage: userPartLimit,
|
||||
maxPartitionsPerPostMessage: userPartLimit,
|
||||
}
|
||||
|
||||
di := &dline.Info{
|
||||
@ -374,6 +373,98 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of
|
||||
// DeclareFaultsRecovered messages for a given number of partitions based on user config
|
||||
func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) {
|
||||
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
|
||||
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
|
||||
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
|
||||
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
|
||||
ctx := context.Background()
|
||||
|
||||
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
|
||||
postAct := tutils.NewIDAddr(t, 100)
|
||||
|
||||
mockStgMinerAPI := newMockStorageMinerAPI()
|
||||
|
||||
// Get the number of sectors allowed in a partition for this proof type
|
||||
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message
|
||||
userPartLimit := 3
|
||||
partitionCount := 20
|
||||
faultyPartitionCount := 11
|
||||
|
||||
var partitions []api.Partition
|
||||
for p := 0; p < partitionCount; p++ {
|
||||
sectors := bitfield.New()
|
||||
for s := uint64(0); s < sectorsPerPartition; s++ {
|
||||
sectors.Set(s)
|
||||
}
|
||||
|
||||
partition := api.Partition{
|
||||
AllSectors: sectors,
|
||||
FaultySectors: bitfield.New(),
|
||||
RecoveringSectors: bitfield.New(),
|
||||
LiveSectors: sectors,
|
||||
ActiveSectors: sectors,
|
||||
}
|
||||
|
||||
if p < faultyPartitionCount {
|
||||
partition.FaultySectors = sectors
|
||||
}
|
||||
|
||||
partitions = append(partitions, partition)
|
||||
}
|
||||
|
||||
mockStgMinerAPI.setPartitions(partitions)
|
||||
|
||||
// Run declareRecoverios
|
||||
scheduler := &WindowPoStScheduler{
|
||||
api: mockStgMinerAPI,
|
||||
prover: &mockProver{},
|
||||
verifier: &mockVerif{},
|
||||
faultTracker: &mockFaultTracker{},
|
||||
proofType: proofType,
|
||||
actor: postAct,
|
||||
journal: journal.NilJournal(),
|
||||
|
||||
maxPartitionsPerRecoveryMessage: userPartLimit,
|
||||
}
|
||||
|
||||
di := uint64(0)
|
||||
ts := mockTipSet(t)
|
||||
|
||||
expectedMsgCount := faultyPartitionCount/userPartLimit + 1
|
||||
lastMsgParts := faultyPartitionCount % userPartLimit
|
||||
|
||||
go func() {
|
||||
batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key())
|
||||
require.NoError(t, err, "failed to declare recoveries")
|
||||
require.Equal(t, len(batchedRecoveries), len(msgs))
|
||||
require.Equal(t, expectedMsgCount, len(msgs))
|
||||
}()
|
||||
|
||||
// Read the window PoST messages
|
||||
for i := 0; i < expectedMsgCount; i++ {
|
||||
msg := <-mockStgMinerAPI.pushedMessages
|
||||
require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method)
|
||||
var params minertypes.DeclareFaultsRecoveredParams
|
||||
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
require.NoError(t, err)
|
||||
|
||||
if i == expectedMsgCount-1 {
|
||||
// In the last message we only included a 21 partitions
|
||||
require.Len(t, params.Recoveries, lastMsgParts)
|
||||
} else {
|
||||
// All previous messages should include the full number of partitions
|
||||
require.Len(t, params.Recoveries, userPartLimit)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func mockTipSet(t *testing.T) *types.TipSet {
|
||||
minerAct := tutils.NewActorAddr(t, "miner")
|
||||
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")
|
||||
|
@ -30,16 +30,17 @@ import (
|
||||
// WindowPoStScheduler watches the chain though the changeHandler, which in turn
|
||||
// turn calls the scheduler when the time arrives to do work.
|
||||
type WindowPoStScheduler struct {
|
||||
api fullNodeFilteredAPI
|
||||
feeCfg config.MinerFeeConfig
|
||||
addrSel *AddressSelector
|
||||
prover storage.Prover
|
||||
verifier ffiwrapper.Verifier
|
||||
faultTracker sectorstorage.FaultTracker
|
||||
proofType abi.RegisteredPoStProof
|
||||
partitionSectors uint64
|
||||
maxPartitionsPerMessage int
|
||||
ch *changeHandler
|
||||
api fullNodeFilteredAPI
|
||||
feeCfg config.MinerFeeConfig
|
||||
addrSel *AddressSelector
|
||||
prover storage.Prover
|
||||
verifier ffiwrapper.Verifier
|
||||
faultTracker sectorstorage.FaultTracker
|
||||
proofType abi.RegisteredPoStProof
|
||||
partitionSectors uint64
|
||||
maxPartitionsPerPostMessage int
|
||||
maxPartitionsPerRecoveryMessage int
|
||||
ch *changeHandler
|
||||
|
||||
actor address.Address
|
||||
|
||||
@ -66,17 +67,18 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
|
||||
}
|
||||
|
||||
return &WindowPoStScheduler{
|
||||
api: api,
|
||||
feeCfg: cfg,
|
||||
addrSel: as,
|
||||
prover: sp,
|
||||
verifier: verif,
|
||||
faultTracker: ft,
|
||||
proofType: mi.WindowPoStProofType,
|
||||
partitionSectors: mi.WindowPoStPartitionSectors,
|
||||
maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage,
|
||||
api: api,
|
||||
feeCfg: cfg,
|
||||
addrSel: as,
|
||||
prover: sp,
|
||||
verifier: verif,
|
||||
faultTracker: ft,
|
||||
proofType: mi.WindowPoStProofType,
|
||||
partitionSectors: mi.WindowPoStPartitionSectors,
|
||||
|
||||
actor: actor,
|
||||
maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage,
|
||||
maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage,
|
||||
actor: actor,
|
||||
evtTypes: [...]journal.EventType{
|
||||
evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"),
|
||||
evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),
|
||||
|
Loading…
Reference in New Issue
Block a user