From d68a60e8d9f53fee6a0251ba004fe47618f436e9 Mon Sep 17 00:00:00 2001 From: Aayush Date: Thu, 7 Jul 2022 10:52:22 -0400 Subject: [PATCH] feat: recovery: Config for maximum partition count per message --- .../en/default-lotus-miner-config.toml | 14 +- node/config/doc_gen.go | 12 +- node/config/types.go | 13 +- storage/wdpost_run.go | 144 +++++++++--------- storage/wdpost_run_test.go | 97 +++++++++++- storage/wdpost_sched.go | 42 ++--- 6 files changed, 226 insertions(+), 96 deletions(-) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 276e1dd5f..c10a4f476 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -318,8 +318,18 @@ # Setting this value above the network limit has no effect # # type: int - # env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE - #MaxPartitionsPerMessage = 0 + # env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE + #MaxPartitionsPerPoStMessage = 0 + + # In some cases when submitting DeclareFaultsRecovered messages, + # there may be too many recoveries to fit in a BlockGasLimit. + # In those cases it may be necessary to set this value to something low (eg 1); + # Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + # resulting in more total gas use (but each message will have lower gas limit) + # + # type: int + # env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE + #MaxPartitionsPerRecoveryMessage = 0 [Sealing] diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 4a90cfcc6..8b8b14999 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -630,11 +630,21 @@ over the worker address if this flag is set.`, Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`, }, { - Name: "MaxPartitionsPerMessage", + Name: "MaxPartitionsPerPoStMessage", Type: "int", Comment: `Setting this value above the network limit has no effect`, }, + { + Name: "MaxPartitionsPerRecoveryMessage", + Type: "int", + + Comment: `In some cases when submitting DeclareFaultsRecovered messages, +there may be too many recoveries to fit in a BlockGasLimit. +In those cases it may be necessary to set this value to something low (eg 1); +Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, +resulting in more total gas use (but each message will have lower gas limit)`, + }, }, "Pubsub": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index bc1098b74..ecced5514 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -222,7 +222,6 @@ type ProvingConfig struct { // Maximum number of sector checks to run in parallel. (0 = unlimited) ParallelCheckLimit int - // todo disable builtin post // Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16) // // A single partition may contain up to 2349 32GiB sectors, or 2300 64GiB sectors. @@ -236,7 +235,17 @@ type ProvingConfig struct { // to prove each deadline, resulting in more total gas use (but each message will have lower gas limit) // // Setting this value above the network limit has no effect - MaxPartitionsPerMessage int + MaxPartitionsPerPoStMessage int + + // Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit. + + // In some cases when submitting DeclareFaultsRecovered messages, + // there may be too many recoveries to fit in a BlockGasLimit. + // In those cases it may be necessary to set this value to something low (eg 1); + // Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + // resulting in more total gas use (but each message will have lower gas limit) + MaxPartitionsPerRecoveryMessage int + // todo disable builtin post } type SealingConfig struct { diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index d8c324865..2c0a89d6c 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -261,14 +261,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B // TODO: the waiting should happen in the background. Right now this // is blocking/delaying the actual generation and submission of WindowPoSts in // this deadline! -func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { +func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) { ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries") defer span.End() faulty := uint64(0) - params := &miner.DeclareFaultsRecoveredParams{ - Recoveries: []miner.RecoveryDeclaration{}, - } + + var batchedRecoveryDecls [][]miner.RecoveryDeclaration + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + totalRecoveries := 0 for partIdx, partition := range partitions { unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) @@ -302,55 +303,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6 continue } - params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{ + // respect user config if set + if s.maxPartitionsPerRecoveryMessage > 0 && + len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage { + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + } + + batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{ Deadline: dlIdx, Partition: uint64(partIdx), Sectors: recovered, }) + + totalRecoveries++ } - recoveries := params.Recoveries - if len(recoveries) == 0 { + if totalRecoveries == 0 { if faulty != 0 { log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty) } - return recoveries, nil, nil + return nil, nil, nil } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + var msgs []*types.SignedMessage + for _, recovery := range batchedRecoveryDecls { + params := &miner.DeclareFaultsRecoveredParams{ + Recoveries: recovery, + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + Method: builtin.MethodsMiner.DeclareFaultsRecovered, + Params: enc, + Value: types.NewInt(0), + } + spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} + if err := s.prepareMessage(ctx, msg, spec); err != nil { + return nil, nil, err + } + + sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) + if err != nil { + return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err) + } + + log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) + msgs = append(msgs, sm) } - msg := &types.Message{ - To: s.actor, - Method: builtin.MethodsMiner.DeclareFaultsRecovered, - Params: enc, - Value: types.NewInt(0), - } - spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.prepareMessage(ctx, msg, spec); err != nil { - return recoveries, nil, err + for _, msg := range msgs { + rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) + if err != nil { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err) + } + + if rec.Receipt.ExitCode != 0 { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) + } } - sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) - if err != nil { - return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err) - } - - log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) - - rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) - if err != nil { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err) - } - - if rec.Receipt.ExitCode != 0 { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) - } - - return recoveries, sm, nil + return batchedRecoveryDecls, msgs, nil } // declareFaults identifies the sectors on the specified proving deadline that @@ -464,9 +482,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } var ( - sigmsg *types.SignedMessage - recoveries []miner.RecoveryDeclaration - faults []miner.FaultDeclaration + sigmsgs []*types.SignedMessage + recoveries [][]miner.RecoveryDeclaration // optionalCid returns the CID of the message, or cid.Undef is the // message is nil. We don't need the argument (could capture the @@ -479,37 +496,28 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } ) - if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { + if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse log.Errorf("checking sector recoveries: %v", err) } - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { - j := WdPoStRecoveriesProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: recoveries, - MessageCID: optionalCid(sigmsg), + // should always be true, skip journaling if not for some reason + if len(recoveries) == len(sigmsgs) { + for i, recovery := range recoveries { + // clone for function literal + recovery := recovery + msgCID := optionalCid(sigmsgs[i]) + s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { + j := WdPoStRecoveriesProcessedEvt{ + evtCommon: s.getEvtCommon(err), + Declarations: recovery, + MessageCID: msgCID, + } + j.Error = err + return j + }) } - j.Error = err - return j - }) - - if ts.Height() > build.UpgradeIgnitionHeight { - return // FORK: declaring faults after ignition upgrade makes no sense } - - if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { - // TODO: This is also potentially really bad, but we try to post anyways - log.Errorf("checking sector faults: %v", err) - } - - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} { - return WdPoStFaultsProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: faults, - MessageCID: optionalCid(sigmsg), - } - }) }() } @@ -779,9 +787,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net } // respect user config if set - if s.maxPartitionsPerMessage > 0 { - if partitionsPerMsg > s.maxPartitionsPerMessage { - partitionsPerMsg = s.maxPartitionsPerMessage + if s.maxPartitionsPerPostMessage > 0 { + if partitionsPerMsg > s.maxPartitionsPerPostMessage { + partitionsPerMsg = s.maxPartitionsPerPostMessage } } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index dcb5e9525..51dc5782f 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -275,7 +275,7 @@ func TestWDPostDoPost(t *testing.T) { } } -// TestWDPostDoPost verifies that doPost will send the correct number of window +// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window // PoST messages for a given number of partitions based on user config func TestWDPostDoPostPartLimitConfig(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, @@ -338,9 +338,8 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { proofType: proofType, actor: postAct, journal: journal.NilJournal(), - addrSel: &ctladdr.AddressSelector{}, - maxPartitionsPerMessage: userPartLimit, + maxPartitionsPerPostMessage: userPartLimit, } di := &dline.Info{ @@ -374,6 +373,98 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { } } +// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of +// DeclareFaultsRecovered messages for a given number of partitions based on user config +func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) { + //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, + //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 + //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 + //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 + ctx := context.Background() + + proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 + postAct := tutils.NewIDAddr(t, 100) + + mockStgMinerAPI := newMockStorageMinerAPI() + + // Get the number of sectors allowed in a partition for this proof type + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType) + require.NoError(t, err) + + // Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message + userPartLimit := 3 + partitionCount := 20 + faultyPartitionCount := 11 + + var partitions []api.Partition + for p := 0; p < partitionCount; p++ { + sectors := bitfield.New() + for s := uint64(0); s < sectorsPerPartition; s++ { + sectors.Set(s) + } + + partition := api.Partition{ + AllSectors: sectors, + FaultySectors: bitfield.New(), + RecoveringSectors: bitfield.New(), + LiveSectors: sectors, + ActiveSectors: sectors, + } + + if p < faultyPartitionCount { + partition.FaultySectors = sectors + } + + partitions = append(partitions, partition) + } + + mockStgMinerAPI.setPartitions(partitions) + + // Run declareRecoverios + scheduler := &WindowPoStScheduler{ + api: mockStgMinerAPI, + prover: &mockProver{}, + verifier: &mockVerif{}, + faultTracker: &mockFaultTracker{}, + proofType: proofType, + actor: postAct, + journal: journal.NilJournal(), + + maxPartitionsPerRecoveryMessage: userPartLimit, + } + + di := uint64(0) + ts := mockTipSet(t) + + expectedMsgCount := faultyPartitionCount/userPartLimit + 1 + lastMsgParts := faultyPartitionCount % userPartLimit + + go func() { + batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key()) + require.NoError(t, err, "failed to declare recoveries") + require.Equal(t, len(batchedRecoveries), len(msgs)) + require.Equal(t, expectedMsgCount, len(msgs)) + }() + + // Read the window PoST messages + for i := 0; i < expectedMsgCount; i++ { + msg := <-mockStgMinerAPI.pushedMessages + require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method) + var params minertypes.DeclareFaultsRecoveredParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + + if i == expectedMsgCount-1 { + // In the last message we only included a 21 partitions + require.Len(t, params.Recoveries, lastMsgParts) + } else { + // All previous messages should include the full number of partitions + require.Len(t, params.Recoveries, userPartLimit) + } + + } +} + func mockTipSet(t *testing.T) *types.TipSet { minerAct := tutils.NewActorAddr(t, "miner") c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 897705646..c9a791b4f 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -30,16 +30,17 @@ import ( // WindowPoStScheduler watches the chain though the changeHandler, which in turn // turn calls the scheduler when the time arrives to do work. type WindowPoStScheduler struct { - api fullNodeFilteredAPI - feeCfg config.MinerFeeConfig - addrSel *AddressSelector - prover storage.Prover - verifier ffiwrapper.Verifier - faultTracker sectorstorage.FaultTracker - proofType abi.RegisteredPoStProof - partitionSectors uint64 - maxPartitionsPerMessage int - ch *changeHandler + api fullNodeFilteredAPI + feeCfg config.MinerFeeConfig + addrSel *AddressSelector + prover storage.Prover + verifier ffiwrapper.Verifier + faultTracker sectorstorage.FaultTracker + proofType abi.RegisteredPoStProof + partitionSectors uint64 + maxPartitionsPerPostMessage int + maxPartitionsPerRecoveryMessage int + ch *changeHandler actor address.Address @@ -66,17 +67,18 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI, } return &WindowPoStScheduler{ - api: api, - feeCfg: cfg, - addrSel: as, - prover: sp, - verifier: verif, - faultTracker: ft, - proofType: mi.WindowPoStProofType, - partitionSectors: mi.WindowPoStPartitionSectors, - maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage, + api: api, + feeCfg: cfg, + addrSel: as, + prover: sp, + verifier: verif, + faultTracker: ft, + proofType: mi.WindowPoStProofType, + partitionSectors: mi.WindowPoStPartitionSectors, - actor: actor, + maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage, + maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage, + actor: actor, evtTypes: [...]journal.EventType{ evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"), evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),