feat: recovery: Config for maximum partition count per message

This commit is contained in:
Aayush 2022-07-07 10:52:22 -04:00
parent b499ef0c3a
commit 770c32c3cb
8 changed files with 291 additions and 95 deletions

View File

@ -390,8 +390,18 @@
# Setting this value above the network limit has no effect
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE
#MaxPartitionsPerMessage = 0
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
#MaxPartitionsPerPoStMessage = 0
# In some cases when submitting DeclareFaultsRecovered messages,
# there may be too many recoveries to fit in a BlockGasLimit.
# In those cases it may be necessary to set this value to something low (eg 1);
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
# resulting in more total gas use (but each message will have lower gas limit)
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
#MaxPartitionsPerRecoveryMessage = 0
[Sealing]

View File

@ -690,7 +690,7 @@ After changing this option, confirm that the new value works in your setup by in
'lotus-miner proving compute window-post 0'`,
},
{
Name: "MaxPartitionsPerMessage",
Name: "MaxPartitionsPerPoStMessage",
Type: "int",
Comment: `Maximum number of partitions to prove in a single SubmitWindowPoSt messace. 0 = network limit (10 in nv16)
@ -707,6 +707,16 @@ to prove each deadline, resulting in more total gas use (but each message will h
Setting this value above the network limit has no effect`,
},
{
Name: "MaxPartitionsPerRecoveryMessage",
Type: "int",
Comment: `In some cases when submitting DeclareFaultsRecovered messages,
there may be too many recoveries to fit in a BlockGasLimit.
In those cases it may be necessary to set this value to something low (eg 1);
Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
resulting in more total gas use (but each message will have lower gas limit)`,
},
},
"Pubsub": []DocField{
{

View File

@ -284,7 +284,16 @@ type ProvingConfig struct {
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
//
// Setting this value above the network limit has no effect
MaxPartitionsPerMessage int
MaxPartitionsPerPoStMessage int
// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.
// In some cases when submitting DeclareFaultsRecovered messages,
// there may be too many recoveries to fit in a BlockGasLimit.
// In those cases it may be necessary to set this value to something low (eg 1);
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
// resulting in more total gas use (but each message will have lower gas limit)
MaxPartitionsPerRecoveryMessage int
}
type SealingConfig struct {

View File

@ -0,0 +1,64 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/filecoin-project/lotus/storage/paths"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Store", reflect.TypeOf((*pkg_.Store)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/filecoin-project/lotus/storage/paths"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

View File

@ -518,9 +518,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
}
// respect user config if set
if s.maxPartitionsPerMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerMessage {
partitionsPerMsg = s.maxPartitionsPerMessage
if s.maxPartitionsPerPostMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
partitionsPerMsg = s.maxPartitionsPerPostMessage
}
}

View File

@ -36,14 +36,15 @@ import (
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End()
faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{},
}
var batchedRecoveryDecls [][]miner.RecoveryDeclaration
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
totalRecoveries := 0
for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
@ -77,55 +78,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
continue
}
params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{
// respect user config if set
if s.maxPartitionsPerRecoveryMessage > 0 &&
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
}
batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
Deadline: dlIdx,
Partition: uint64(partIdx),
Sectors: recovered,
})
totalRecoveries++
}
recoveries := params.Recoveries
if len(recoveries) == 0 {
if totalRecoveries == 0 {
if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
}
return recoveries, nil, nil
return nil, nil, nil
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
var msgs []*types.SignedMessage
for _, recovery := range batchedRecoveryDecls {
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: recovery,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, nil, err
}
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
msgs = append(msgs, sm)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return recoveries, nil, err
for _, msg := range msgs {
rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
}
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
}
if rec.Receipt.ExitCode != 0 {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
return recoveries, sm, nil
return batchedRecoveryDecls, msgs, nil
}
// declareFaults identifies the sectors on the specified proving deadline that
@ -241,9 +259,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}
var (
sigmsg *types.SignedMessage
recoveries []miner.RecoveryDeclaration
faults []miner.FaultDeclaration
sigmsgs []*types.SignedMessage
recoveries [][]miner.RecoveryDeclaration
// optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the
@ -256,36 +273,27 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}
)
if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recoveries,
MessageCID: optionalCid(sigmsg),
// should always be true, skip journaling if not for some reason
if len(recoveries) == len(sigmsgs) {
for i, recovery := range recoveries {
// clone for function literal
recovery := recovery
msgCID := optionalCid(sigmsgs[i])
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recovery,
MessageCID: msgCID,
}
j.Error = err
return j
})
}
j.Error = err
return j
})
if ts.Height() > build.UpgradeIgnitionHeight {
return // FORK: declaring faults after ignition upgrade makes no sense
}
if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,
MessageCID: optionalCid(sigmsg),
}
})
}()
}

View File

@ -268,7 +268,7 @@ func TestWDPostDoPost(t *testing.T) {
}
}
// TestWDPostDoPost verifies that doPost will send the correct number of window
// TestWDPostDoPostPartLimitConfig verifies that doPost will send the correct number of window
// PoST messages for a given number of partitions based on user config
func TestWDPostDoPostPartLimitConfig(t *testing.T) {
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
@ -333,7 +333,7 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
journal: journal.NilJournal(),
addrSel: &ctladdr.AddressSelector{},
maxPartitionsPerMessage: userPartLimit,
maxPartitionsPerPostMessage: userPartLimit,
}
di := &dline.Info{
@ -367,6 +367,99 @@ func TestWDPostDoPostPartLimitConfig(t *testing.T) {
}
}
// TestWDPostDeclareRecoveriesPartLimitConfig verifies that declareRecoveries will send the correct number of
// DeclareFaultsRecovered messages for a given number of partitions based on user config
func TestWDPostDeclareRecoveriesPartLimitConfig(t *testing.T) {
//stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001,
//stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01
//stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001
//stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001
ctx := context.Background()
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
postAct := tutils.NewIDAddr(t, 100)
mockStgMinerAPI := newMockStorageMinerAPI()
// Get the number of sectors allowed in a partition for this proof type
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType)
require.NoError(t, err)
// Let's have 11/20 partitions with faulty sectors, and a config of 3 partitions per message
userPartLimit := 3
partitionCount := 20
faultyPartitionCount := 11
var partitions []api.Partition
for p := 0; p < partitionCount; p++ {
sectors := bitfield.New()
for s := uint64(0); s < sectorsPerPartition; s++ {
sectors.Set(s)
}
partition := api.Partition{
AllSectors: sectors,
FaultySectors: bitfield.New(),
RecoveringSectors: bitfield.New(),
LiveSectors: sectors,
ActiveSectors: sectors,
}
if p < faultyPartitionCount {
partition.FaultySectors = sectors
}
partitions = append(partitions, partition)
}
mockStgMinerAPI.setPartitions(partitions)
// Run declareRecoverios
scheduler := &WindowPoStScheduler{
api: mockStgMinerAPI,
prover: &mockProver{},
verifier: &mockVerif{},
faultTracker: &mockFaultTracker{},
proofType: proofType,
actor: postAct,
journal: journal.NilJournal(),
addrSel: &ctladdr.AddressSelector{},
maxPartitionsPerRecoveryMessage: userPartLimit,
}
di := uint64(0)
ts := mockTipSet(t)
expectedMsgCount := faultyPartitionCount/userPartLimit + 1
lastMsgParts := faultyPartitionCount % userPartLimit
go func() {
batchedRecoveries, msgs, err := scheduler.declareRecoveries(ctx, di, partitions, ts.Key())
require.NoError(t, err, "failed to declare recoveries")
require.Equal(t, len(batchedRecoveries), len(msgs))
require.Equal(t, expectedMsgCount, len(msgs))
}()
// Read the window PoST messages
for i := 0; i < expectedMsgCount; i++ {
msg := <-mockStgMinerAPI.pushedMessages
require.Equal(t, builtin.MethodsMiner.DeclareFaultsRecovered, msg.Method)
var params minertypes.DeclareFaultsRecoveredParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
if i == expectedMsgCount-1 {
// In the last message we only included a 21 partitions
require.Len(t, params.Recoveries, lastMsgParts)
} else {
// All previous messages should include the full number of partitions
require.Len(t, params.Recoveries, userPartLimit)
}
}
}
func mockTipSet(t *testing.T) *types.TipSet {
minerAct := tutils.NewActorAddr(t, "miner")
c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH")

View File

@ -62,17 +62,18 @@ type NodeAPI interface {
// WindowPoStScheduler watches the chain though the changeHandler, which in turn
// turn calls the scheduler when the time arrives to do work.
type WindowPoStScheduler struct {
api NodeAPI
feeCfg config.MinerFeeConfig
addrSel *ctladdr.AddressSelector
prover storiface.ProverPoSt
verifier storiface.Verifier
faultTracker sealer.FaultTracker
proofType abi.RegisteredPoStProof
partitionSectors uint64
disablePreChecks bool
maxPartitionsPerMessage int
ch *changeHandler
api NodeAPI
feeCfg config.MinerFeeConfig
addrSel *ctladdr.AddressSelector
prover storiface.ProverPoSt
verifier storiface.Verifier
faultTracker sealer.FaultTracker
proofType abi.RegisteredPoStProof
partitionSectors uint64
disablePreChecks bool
maxPartitionsPerPostMessage int
maxPartitionsPerRecoveryMessage int
ch *changeHandler
actor address.Address
@ -99,16 +100,17 @@ func NewWindowedPoStScheduler(api NodeAPI,
}
return &WindowPoStScheduler{
api: api,
feeCfg: cfg,
addrSel: as,
prover: sp,
verifier: verif,
faultTracker: ft,
proofType: mi.WindowPoStProofType,
partitionSectors: mi.WindowPoStPartitionSectors,
disablePreChecks: pcfg.DisableWDPoStPreChecks,
maxPartitionsPerMessage: pcfg.MaxPartitionsPerMessage,
api: api,
feeCfg: cfg,
addrSel: as,
prover: sp,
verifier: verif,
faultTracker: ft,
proofType: mi.WindowPoStProofType,
partitionSectors: mi.WindowPoStPartitionSectors,
disablePreChecks: pcfg.DisableWDPoStPreChecks,
maxPartitionsPerPostMessage: pcfg.MaxPartitionsPerPoStMessage,
maxPartitionsPerRecoveryMessage: pcfg.MaxPartitionsPerRecoveryMessage,
actor: actor,
evtTypes: [...]journal.EventType{