From 770c32c3cbd8ab1abed59530c3f4e508591255cb Mon Sep 17 00:00:00 2001 From: Aayush Date: Thu, 7 Jul 2022 10:52:22 -0400 Subject: [PATCH] feat: recovery: Config for maximum partition count per message --- .../en/default-lotus-miner-config.toml | 14 +- node/config/doc_gen.go | 12 +- node/config/types.go | 11 +- .../paths/gomock_reflect_3544667724/prog.go | 64 ++++++++ storage/wdpost/wdpost_run.go | 6 +- storage/wdpost/wdpost_run_faults.go | 138 +++++++++--------- storage/wdpost/wdpost_run_test.go | 97 +++++++++++- storage/wdpost/wdpost_sched.go | 44 +++--- 8 files changed, 291 insertions(+), 95 deletions(-) create mode 100644 storage/paths/gomock_reflect_3544667724/prog.go diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 5bee7b503..f44e71ad8 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -390,8 +390,18 @@ # Setting this value above the network limit has no effect # # type: int - # env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE - #MaxPartitionsPerMessage = 0 + # env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE + #MaxPartitionsPerPoStMessage = 0 + + # In some cases when submitting DeclareFaultsRecovered messages, + # there may be too many recoveries to fit in a BlockGasLimit. + # In those cases it may be necessary to set this value to something low (eg 1); + # Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + # resulting in more total gas use (but each message will have lower gas limit) + # + # type: int + # env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE + #MaxPartitionsPerRecoveryMessage = 0 [Sealing] diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 6422090b8..41aca47b9 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -690,7 +690,7 @@ After changing this option, confirm that the new value works in your setup by in 'lotus-miner proving compute window-post 0'`, }, { - Name: "MaxPartitionsPerMessage", + Name: "MaxPartitionsPerPoStMessage", Type: "int", Comment: `Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16) @@ -707,6 +707,16 @@ to prove each deadline, resulting in more total gas use (but each message will h Setting this value above the network limit has no effect`, }, + { + Name: "MaxPartitionsPerRecoveryMessage", + Type: "int", + + Comment: `In some cases when submitting DeclareFaultsRecovered messages, +there may be too many recoveries to fit in a BlockGasLimit. +In those cases it may be necessary to set this value to something low (eg 1); +Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, +resulting in more total gas use (but each message will have lower gas limit)`, + }, }, "Pubsub": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 1f8b60e84..320026063 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -284,7 +284,16 @@ type ProvingConfig struct { // to prove each deadline, resulting in more total gas use (but each message will have lower gas limit) // // Setting this value above the network limit has no effect - MaxPartitionsPerMessage int + MaxPartitionsPerPoStMessage int + + // Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit. + + // In some cases when submitting DeclareFaultsRecovered messages, + // there may be too many recoveries to fit in a BlockGasLimit. + // In those cases it may be necessary to set this value to something low (eg 1); + // Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed, + // resulting in more total gas use (but each message will have lower gas limit) + MaxPartitionsPerRecoveryMessage int } type SealingConfig struct { diff --git a/storage/paths/gomock_reflect_3544667724/prog.go b/storage/paths/gomock_reflect_3544667724/prog.go new file mode 100644 index 000000000..4eb9389a5 --- /dev/null +++ b/storage/paths/gomock_reflect_3544667724/prog.go @@ -0,0 +1,64 @@ +package main + +import ( + "encoding/gob" + "flag" + "fmt" + "os" + "path" + "reflect" + + "github.com/golang/mock/mockgen/model" + + pkg_ "github.com/filecoin-project/lotus/storage/paths" +) + +var output = flag.String("output", "", "The output file name, or empty to use stdout.") + +func main() { + flag.Parse() + + its := []struct { + sym string + typ reflect.Type + }{ + + {"Store", reflect.TypeOf((*pkg_.Store)(nil)).Elem()}, + } + pkg := &model.Package{ + // NOTE: This behaves contrary to documented behaviour if the + // package name is not the final component of the import path. + // The reflect package doesn't expose the package name, though. + Name: path.Base("github.com/filecoin-project/lotus/storage/paths"), + } + + for _, it := range its { + intf, err := model.InterfaceFromInterfaceType(it.typ) + if err != nil { + fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) + os.Exit(1) + } + intf.Name = it.sym + pkg.Interfaces = append(pkg.Interfaces, intf) + } + + outfile := os.Stdout + if len(*output) != 0 { + var err error + outfile, err = os.Create(*output) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) + } + defer func() { + if err := outfile.Close(); err != nil { + fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) + os.Exit(1) + } + }() + } + + if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { + fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) + os.Exit(1) + } +} diff --git a/storage/wdpost/wdpost_run.go b/storage/wdpost/wdpost_run.go index 1d1cff9fd..0e715f101 100644 --- a/storage/wdpost/wdpost_run.go +++ b/storage/wdpost/wdpost_run.go @@ -518,9 +518,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net } // respect user config if set - if s.maxPartitionsPerMessage > 0 { - if partitionsPerMsg > s.maxPartitionsPerMessage { - partitionsPerMsg = s.maxPartitionsPerMessage + if s.maxPartitionsPerPostMessage > 0 { + if partitionsPerMsg > s.maxPartitionsPerPostMessage { + partitionsPerMsg = s.maxPartitionsPerPostMessage } } diff --git a/storage/wdpost/wdpost_run_faults.go b/storage/wdpost/wdpost_run_faults.go index 0eda986fb..b63b17491 100644 --- a/storage/wdpost/wdpost_run_faults.go +++ b/storage/wdpost/wdpost_run_faults.go @@ -36,14 +36,15 @@ import ( // TODO: the waiting should happen in the background. Right now this // is blocking/delaying the actual generation and submission of WindowPoSts in // this deadline! -func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { +func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) { ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries") defer span.End() faulty := uint64(0) - params := &miner.DeclareFaultsRecoveredParams{ - Recoveries: []miner.RecoveryDeclaration{}, - } + + var batchedRecoveryDecls [][]miner.RecoveryDeclaration + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + totalRecoveries := 0 for partIdx, partition := range partitions { unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) @@ -77,55 +78,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6 continue } - params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{ + // respect user config if set + if s.maxPartitionsPerRecoveryMessage > 0 && + len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage { + batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{}) + } + + batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{ Deadline: dlIdx, Partition: uint64(partIdx), Sectors: recovered, }) + + totalRecoveries++ } - recoveries := params.Recoveries - if len(recoveries) == 0 { + if totalRecoveries == 0 { if faulty != 0 { log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty) } - return recoveries, nil, nil + return nil, nil, nil } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + var msgs []*types.SignedMessage + for _, recovery := range batchedRecoveryDecls { + params := &miner.DeclareFaultsRecoveredParams{ + Recoveries: recovery, + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + Method: builtin.MethodsMiner.DeclareFaultsRecovered, + Params: enc, + Value: types.NewInt(0), + } + spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} + if err := s.prepareMessage(ctx, msg, spec); err != nil { + return nil, nil, err + } + + sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) + if err != nil { + return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err) + } + + log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) + msgs = append(msgs, sm) } - msg := &types.Message{ - To: s.actor, - Method: builtin.MethodsMiner.DeclareFaultsRecovered, - Params: enc, - Value: types.NewInt(0), - } - spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.prepareMessage(ctx, msg, spec); err != nil { - return recoveries, nil, err + for _, msg := range msgs { + rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) + if err != nil { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err) + } + + if rec.Receipt.ExitCode != 0 { + return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) + } } - sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) - if err != nil { - return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err) - } - - log.Warnw("declare faults recovered Message CID", "cid", sm.Cid()) - - rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) - if err != nil { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err) - } - - if rec.Receipt.ExitCode != 0 { - return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode) - } - - return recoveries, sm, nil + return batchedRecoveryDecls, msgs, nil } // declareFaults identifies the sectors on the specified proving deadline that @@ -241,9 +259,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } var ( - sigmsg *types.SignedMessage - recoveries []miner.RecoveryDeclaration - faults []miner.FaultDeclaration + sigmsgs []*types.SignedMessage + recoveries [][]miner.RecoveryDeclaration // optionalCid returns the CID of the message, or cid.Undef is the // message is nil. We don't need the argument (could capture the @@ -256,36 +273,27 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet) } ) - if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { + if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse log.Errorf("checking sector recoveries: %v", err) } - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { - j := WdPoStRecoveriesProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: recoveries, - MessageCID: optionalCid(sigmsg), + // should always be true, skip journaling if not for some reason + if len(recoveries) == len(sigmsgs) { + for i, recovery := range recoveries { + // clone for function literal + recovery := recovery + msgCID := optionalCid(sigmsgs[i]) + s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} { + j := WdPoStRecoveriesProcessedEvt{ + evtCommon: s.getEvtCommon(err), + Declarations: recovery, + MessageCID: msgCID, + } + j.Error = err + return j + }) } - j.Error = err - return j - }) - - if ts.Height() > build.UpgradeIgnitionHeight { - return // FORK: declaring faults after ignition upgrade makes no sense } - - if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { - // TODO: This is also potentially really bad, but we try to post anyways - log.Errorf("checking sector faults: %v", err) - } - - s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} { - return WdPoStFaultsProcessedEvt{ - evtCommon: s.getEvtCommon(err), - Declarations: faults, - MessageCID: optionalCid(sigmsg), - } - }) }() } diff --git a/storage/wdpost/wdpost_run_test.go b/storage/wdpost/wdpost_run_test.go index 4c108b812..233e40472 100644 --- a/storage/wdpost/wdpost_run_test.go +++ b/storage/wdpost/wdpost_run_test.go @@ -268,7 +268,7 @@ func TestWDPostDoPost(t *testing.T) { } } -// TestWDPostDoPost verifies that doPost will send the correct number of window +// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window // PoST messages for a given number of partitions based on user config func TestWDPostDoPostPartLimitConfig(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, @@ -333,7 +333,7 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { journal: journal.NilJournal(), addrSel: &ctladdr.AddressSelector{}, - maxPartitionsPerMessage: userPartLimit, + maxPartitionsPerPostMessage: userPartLimit, } di := &dline.Info{ @@ -367,6 +367,99 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) { } } +// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of +// DeclareFaultsRecovered messages for a given number of partitions based on user config +func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) { + //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, + //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 + //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 + //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 + ctx := context.Background() + + proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 + postAct := tutils.NewIDAddr(t, 100) + + mockStgMinerAPI := newMockStorageMinerAPI() + + // Get the number of sectors allowed in a partition for this proof type + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType) + require.NoError(t, err) + + // Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message + userPartLimit := 3 + partitionCount := 20 + faultyPartitionCount := 11 + + var partitions []api.Partition + for p := 0; p < partitionCount; p++ { + sectors := bitfield.New() + for s := uint64(0); s < sectorsPerPartition; s++ { + sectors.Set(s) + } + + partition := api.Partition{ + AllSectors: sectors, + FaultySectors: bitfield.New(), + RecoveringSectors: bitfield.New(), + LiveSectors: sectors, + ActiveSectors: sectors, + } + + if p < faultyPartitionCount { + partition.FaultySectors = sectors + } + + partitions = append(partitions, partition) + } + + mockStgMinerAPI.setPartitions(partitions) + + // Run declareRecoverios + scheduler := &WindowPoStScheduler{ + api: mockStgMinerAPI, + prover: &mockProver{}, + verifier: &mockVerif{}, + faultTracker: &mockFaultTracker{}, + proofType: proofType, + actor: postAct, + journal: journal.NilJournal(), + addrSel: &ctladdr.AddressSelector{}, + + maxPartitionsPerRecoveryMessage: userPartLimit, + } + + di := uint64(0) + ts := mockTipSet(t) + + expectedMsgCount := faultyPartitionCount/userPartLimit + 1 + lastMsgParts := faultyPartitionCount % userPartLimit + + go func() { + batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key()) + require.NoError(t, err, "failed to declare recoveries") + require.Equal(t, len(batchedRecoveries), len(msgs)) + require.Equal(t, expectedMsgCount, len(msgs)) + }() + + // Read the window PoST messages + for i := 0; i < expectedMsgCount; i++ { + msg := <-mockStgMinerAPI.pushedMessages + require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method) + var params minertypes.DeclareFaultsRecoveredParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + + if i == expectedMsgCount-1 { + // In the last message we only included a 21 partitions + require.Len(t, params.Recoveries, lastMsgParts) + } else { + // All previous messages should include the full number of partitions + require.Len(t, params.Recoveries, userPartLimit) + } + + } +} + func mockTipSet(t *testing.T) *types.TipSet { minerAct := tutils.NewActorAddr(t, "miner") c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index 1687a59d8..0473273d9 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -62,17 +62,18 @@ type NodeAPI interface { // WindowPoStScheduler watches the chain though the changeHandler, which in turn // turn calls the scheduler when the time arrives to do work. type WindowPoStScheduler struct { - api NodeAPI - feeCfg config.MinerFeeConfig - addrSel *ctladdr.AddressSelector - prover storiface.ProverPoSt - verifier storiface.Verifier - faultTracker sealer.FaultTracker - proofType abi.RegisteredPoStProof - partitionSectors uint64 - disablePreChecks bool - maxPartitionsPerMessage int - ch *changeHandler + api NodeAPI + feeCfg config.MinerFeeConfig + addrSel *ctladdr.AddressSelector + prover storiface.ProverPoSt + verifier storiface.Verifier + faultTracker sealer.FaultTracker + proofType abi.RegisteredPoStProof + partitionSectors uint64 + disablePreChecks bool + maxPartitionsPerPostMessage int + maxPartitionsPerRecoveryMessage int + ch *changeHandler actor address.Address @@ -99,16 +100,17 @@ func NewWindowedPoStScheduler(api NodeAPI, } return &WindowPoStScheduler{ - api: api, - feeCfg: cfg, - addrSel: as, - prover: sp, - verifier: verif, - faultTracker: ft, - proofType: mi.WindowPoStProofType, - partitionSectors: mi.WindowPoStPartitionSectors, - disablePreChecks: pcfg.DisableWDPoStPreChecks, - maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage, + api: api, + feeCfg: cfg, + addrSel: as, + prover: sp, + verifier: verif, + faultTracker: ft, + proofType: mi.WindowPoStProofType, + partitionSectors: mi.WindowPoStPartitionSectors, + disablePreChecks: pcfg.DisableWDPoStPreChecks, + maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage, + maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage, actor: actor, evtTypes: [...]journal.EventType{