From f0f15f899c2c450ee923e33f9d9b269384dc5d8c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 9 Sep 2020 15:01:37 +0200 Subject: [PATCH] feat: split window PoST messages into batches --- storage/wdpost_run.go | 288 ++++++++++++++++++++-------------- storage/wdpost_run_test.go | 305 +++++++++++++++++++++++++++++++++++++ 2 files changed, 480 insertions(+), 113 deletions(-) create mode 100644 storage/wdpost_run_test.go diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 08a7437a9..1c5d5650c 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -3,7 +3,6 @@ package storage import ( "bytes" "context" - "errors" "time" "github.com/filecoin-project/go-state-types/dline" @@ -30,8 +29,6 @@ import ( "github.com/filecoin-project/lotus/journal" ) -var errNoPartitions = errors.New("no partitions") - func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) { journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { return WdPoStSchedulerEvt{ @@ -79,25 +76,28 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info, }) } - proof, err := s.runPost(ctx, *deadline, ts) - switch err { - case errNoPartitions: - recordProofsEvent(nil, cid.Undef) - return - case nil: - sm, err := s.submitPost(ctx, proof) - if err != nil { - log.Errorf("submitPost failed: %+v", err) - s.failPost(err, deadline) - return - } - recordProofsEvent(proof.Partitions, sm.Cid()) - default: + posts, err := s.runPost(ctx, *deadline, ts) + if err != nil { log.Errorf("runPost failed: %+v", err) s.failPost(err, deadline) return } + if len(posts) == 0 { + recordProofsEvent(nil, cid.Undef) + return + } + + for i := range posts { + post := &posts[i] + sm, err := s.submitPost(ctx, &posts[i]) + if err != nil { + log.Errorf("submitPost failed: %+v", err) + s.failPost(err, deadline) + } + recordProofsEvent(post.Partitions, sm.Cid()) + } + journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { return WdPoStSchedulerEvt{ evtCommon: s.getEvtCommon(nil), @@ -327,7 +327,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, return faults, sm, nil } -func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) { +func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) { ctx, span := trace.StartSpan(ctx, "storage.runPost") defer span.End() @@ -399,136 +399,198 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err) } + // Get the partitions for the given deadline partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key()) if err != nil { return nil, xerrors.Errorf("getting partitions: %w", err) } - params := &miner.SubmitWindowedPoStParams{ - Deadline: di.Index, - Partitions: make([]miner.PoStPartition, 0, len(partitions)), - Proofs: nil, + // Split partitions into batches, so as not to exceed the number of sectors + // allowed in a single message + partitionBatches, err := s.batchPartitions(partitions) + if err != nil { + return nil, err } - skipCount := uint64(0) - postSkipped := bitfield.New() - var postOut []proof.PoStProof + posts := make([]miner.SubmitWindowedPoStParams, 0, len(partitionBatches)) + for batchIdx, batch := range partitionBatches { + batchPartitionStartIdx := 0 + for _, batch := range partitionBatches[:batchIdx] { + batchPartitionStartIdx += len(batch) + } - for retries := 0; retries < 5; retries++ { + params := miner.SubmitWindowedPoStParams{ + Deadline: di.Index, + Partitions: make([]miner.PoStPartition, 0, len(batch)), + Proofs: nil, + } + + //var sinfos []proof.SectorInfo + //sidToPart := map[abi.SectorNumber]uint64{} + //skipCount := uint64(0) + + skipCount := uint64(0) + postSkipped := bitfield.New() + var postOut []proof.PoStProof var sinfos []proof.SectorInfo - sidToPart := map[abi.SectorNumber]int{} - for partIdx, partition := range partitions { - // TODO: Can do this in parallel - toProve, err := partition.ActiveSectors() + for retries := 0; retries < 5; retries++ { + sinfos = make([]proof.SectorInfo, 0) + + for partIdx, partition := range batch { + // TODO: Can do this in parallel + toProve, err := partition.ActiveSectors() + if err != nil { + return nil, xerrors.Errorf("getting active sectors: %w", err) + } + + toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries) + if err != nil { + return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err) + } + + good, err := s.checkSectors(ctx, toProve) + if err != nil { + return nil, xerrors.Errorf("checking sectors to skip: %w", err) + } + + good, err = bitfield.SubtractBitField(good, postSkipped) + if err != nil { + return nil, xerrors.Errorf("toProve - postSkipped: %w", err) + } + + skipped, err := bitfield.SubtractBitField(toProve, good) + if err != nil { + return nil, xerrors.Errorf("toProve - good: %w", err) + } + + sc, err := skipped.Count() + if err != nil { + return nil, xerrors.Errorf("getting skipped sector count: %w", err) + } + + skipCount += sc + + ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts) + if err != nil { + return nil, xerrors.Errorf("getting sorted sector info: %w", err) + } + + if len(ssi) == 0 { + continue + } + + sinfos = append(sinfos, ssi...) + params.Partitions = append(params.Partitions, miner.PoStPartition{ + Index: uint64(batchPartitionStartIdx + partIdx), + Skipped: skipped, + }) + } + + if len(sinfos) == 0 { + // nothing to prove.. + //return nil, errNoPartitions + break + } + + log.Infow("running windowPost", + "chain-random", rand, + "deadline", di, + "height", ts.Height(), + "skipped", skipCount) + + tsStart := build.Clock.Now() + + mid, err := address.IDFromAddress(s.actor) if err != nil { - return nil, xerrors.Errorf("getting active sectors: %w", err) + return nil, err } - toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries) - if err != nil { - return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err) + var ps []abi.SectorID + postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand)) + elapsed := time.Since(tsStart) + + log.Infow("computing window PoSt", "batch", batchIdx, "elapsed", elapsed) + + if err == nil { + break } - good, err := s.checkSectors(ctx, toProve) - if err != nil { - return nil, xerrors.Errorf("checking sectors to skip: %w", err) + if len(ps) == 0 { + return nil, xerrors.Errorf("running post failed: %w", err) } - good, err = bitfield.SubtractBitField(good, postSkipped) - if err != nil { - return nil, xerrors.Errorf("toProve - postSkipped: %w", err) + log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries) + + skipCount += uint64(len(ps)) + for _, sector := range ps { + postSkipped.Set(uint64(sector.Number)) } - - skipped, err := bitfield.SubtractBitField(toProve, good) - if err != nil { - return nil, xerrors.Errorf("toProve - good: %w", err) - } - - sc, err := skipped.Count() - if err != nil { - return nil, xerrors.Errorf("getting skipped sector count: %w", err) - } - - skipCount += sc - - ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts) - if err != nil { - return nil, xerrors.Errorf("getting sorted sector info: %w", err) - } - - if len(ssi) == 0 { - continue - } - - sinfos = append(sinfos, ssi...) - for _, si := range ssi { - sidToPart[si.SectorNumber] = partIdx - } - - params.Partitions = append(params.Partitions, miner.PoStPartition{ - Index: uint64(partIdx), - Skipped: skipped, - }) } if len(sinfos) == 0 { - // nothing to prove.. - return nil, errNoPartitions + continue } - log.Infow("running windowPost", - "chain-random", rand, - "deadline", di, - "height", ts.Height(), - "skipped", skipCount) - - tsStart := build.Clock.Now() - - mid, err := address.IDFromAddress(s.actor) - if err != nil { - return nil, err + if len(postOut) == 0 { + return nil, xerrors.Errorf("received no proofs back from generate window post") } - var ps []abi.SectorID - postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand)) - elapsed := time.Since(tsStart) + params.Proofs = postOut - log.Infow("computing window PoSt", "elapsed", elapsed) - - if err == nil { - break - } - - if len(ps) == 0 { - return nil, xerrors.Errorf("running post failed: %w", err) - } - - log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries) - - skipCount += uint64(len(ps)) - for _, sector := range ps { - postSkipped.Set(uint64(sector.Number)) - } + posts = append(posts, params) } - if len(postOut) == 0 { - return nil, xerrors.Errorf("received no proofs back from generate window post") - } - - params.Proofs = postOut - commEpoch := di.Open commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) if err != nil { return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err) } - params.ChainCommitEpoch = commEpoch - params.ChainCommitRand = commRand - log.Infow("submitting window PoSt") + for i := range posts { + posts[i].ChainCommitEpoch = commEpoch + posts[i].ChainCommitRand = commRand + } - return params, nil + return posts, nil +} + +func (s *WindowPoStScheduler) batchPartitions(partitions []*miner.Partition) ([][]*miner.Partition, error) { + // Get the number of sectors allowed in a partition, for this proof size + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(s.proofType) + if err != nil { + return nil, xerrors.Errorf("getting sectors per partition: %w", err) + } + + // We don't want to exceed the number of sectors allowed in a message. + // So given the number of sectors in a partition, work out the number of + // partitions that can be in a message without exceeding sectors per + // message: + // floor(number of sectors allowed in a message / sectors per partition) + // eg: + // max sectors per message 7: ooooooo + // sectors per partition 3: ooo + // partitions per message 2: oooOOO + // <1><2> (3rd doesn't fit) + partitionsPerMsg := int(miner.AddressedSectorsMax / sectorsPerPartition) + + // The number of messages will be: + // ceiling(number of partitions / partitions per message) + batchCount := len(partitions) / partitionsPerMsg + if len(partitions)%partitionsPerMsg != 0 { + batchCount++ + } + + // Split the partitions into batches + batches := make([][]*miner.Partition, 0, batchCount) + for i := 0; i < len(partitions); i += partitionsPerMsg { + end := i + partitionsPerMsg + if end > len(partitions) { + end = len(partitions) + } + batches = append(batches, partitions[i:end]) + } + return batches, nil } func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof.SectorInfo, error) { diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go new file mode 100644 index 000000000..af529a75e --- /dev/null +++ b/storage/wdpost_run_test.go @@ -0,0 +1,305 @@ +package storage + +import ( + "bytes" + "context" + "testing" + + "github.com/filecoin-project/go-state-types/dline" + + "golang.org/x/xerrors" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/runtime/proof" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + tutils "github.com/filecoin-project/specs-actors/support/testing" +) + +type mockStorageMinerAPI struct { + partitions []*miner.Partition + pushedMessages chan *types.Message +} + +func newMockStorageMinerAPI() *mockStorageMinerAPI { + return &mockStorageMinerAPI{ + pushedMessages: make(chan *types.Message), + } +} + +func (m *mockStorageMinerAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { + return abi.Randomness("ticket rand"), nil +} + +func (m *mockStorageMinerAPI) ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { + return abi.Randomness("beacon rand"), nil +} + +func (m *mockStorageMinerAPI) setPartitions(ps []*miner.Partition) { + m.partitions = append(m.partitions, ps...) +} + +func (m *mockStorageMinerAPI) StateMinerPartitions(ctx context.Context, address address.Address, u uint64, key types.TipSetKey) ([]*miner.Partition, error) { + return m.partitions, nil +} + +func (m *mockStorageMinerAPI) StateMinerSectors(ctx context.Context, address address.Address, field *bitfield.BitField, b bool, key types.TipSetKey) ([]*api.ChainSectorInfo, error) { + var sis []*api.ChainSectorInfo + _ = field.ForEach(func(i uint64) error { + sis = append(sis, &api.ChainSectorInfo{ + Info: miner.SectorOnChainInfo{ + SectorNumber: abi.SectorNumber(i), + }, + ID: abi.SectorNumber(i), + }) + return nil + }) + return sis, nil +} + +func (m *mockStorageMinerAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (api.MinerInfo, error) { + return api.MinerInfo{}, xerrors.Errorf("err") +} + +func (m *mockStorageMinerAPI) MpoolPushMessage(ctx context.Context, message *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { + m.pushedMessages <- message + return &types.SignedMessage{ + Message: *message, + }, nil +} + +func (m *mockStorageMinerAPI) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) { + return &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: 0, + }, + }, nil +} + +type mockProver struct { +} + +func (m *mockProver) GenerateWinningPoSt(context.Context, abi.ActorID, []proof.SectorInfo, abi.PoStRandomness) ([]proof.PoStProof, error) { + panic("implement me") +} + +func (m *mockProver) GenerateWindowPoSt(ctx context.Context, aid abi.ActorID, sis []proof.SectorInfo, pr abi.PoStRandomness) ([]proof.PoStProof, []abi.SectorID, error) { + return []proof.PoStProof{ + { + PoStProof: abi.RegisteredPoStProof_StackedDrgWindow2KiBV1, + ProofBytes: []byte("post-proof"), + }, + }, nil, nil +} + +type mockFaultTracker struct { +} + +func (m mockFaultTracker) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, sectors []abi.SectorID) ([]abi.SectorID, error) { + // Returns "bad" sectors so just return nil meaning all sectors are good + return nil, nil +} + +// TestWDPostDoPost verifies that doPost will send the correct number of window +// PoST messages for a given number of partitions +func TestWDPostDoPost(t *testing.T) { + ctx := context.Background() + expectedMsgCount := 5 + + proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 + postAct := tutils.NewIDAddr(t, 100) + workerAct := tutils.NewIDAddr(t, 101) + + mockStgMinerAPI := newMockStorageMinerAPI() + + // Get the number of sectors allowed in a partition for this proof type + sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(proofType) + require.NoError(t, err) + // Work out the number of partitions that can be included in a message + // without exceeding the message sector limit + partitionsPerMsg := int(miner.AddressedSectorsMax / sectorsPerPartition) + + // Enough partitions to fill expectedMsgCount-1 messages + partitionCount := (expectedMsgCount - 1) * partitionsPerMsg + // Add an extra partition that should be included in the last message + partitionCount++ + + var partitions []*miner.Partition + for p := 0; p < partitionCount; p++ { + sectors := bitfield.New() + for s := uint64(0); s < sectorsPerPartition; s++ { + sectors.Set(s) + } + partitions = append(partitions, &miner.Partition{ + Sectors: sectors, + }) + } + mockStgMinerAPI.setPartitions(partitions) + + // Run window PoST + scheduler := &WindowPoStScheduler{ + api: mockStgMinerAPI, + prover: &mockProver{}, + faultTracker: &mockFaultTracker{}, + proofType: proofType, + actor: postAct, + worker: workerAct, + } + + di := &dline.Info{} + ts := mockTipSet(t) + scheduler.doPost(ctx, di, ts) + + // Read the window PoST messages + for i := 0; i < expectedMsgCount; i++ { + msg := <-mockStgMinerAPI.pushedMessages + require.Equal(t, builtin.MethodsMiner.SubmitWindowedPoSt, msg.Method) + var params miner.SubmitWindowedPoStParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + + if i == expectedMsgCount-1 { + // In the last message we only included a single partition (see above) + require.Len(t, params.Partitions, 1) + } else { + // All previous messages should include the full number of partitions + require.Len(t, params.Partitions, partitionsPerMsg) + } + } +} + +func mockTipSet(t *testing.T) *types.TipSet { + minerAct := tutils.NewActorAddr(t, "miner") + c, err := cid.Decode("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") + require.NoError(t, err) + blks := []*types.BlockHeader{ + { + Miner: minerAct, + Height: abi.ChainEpoch(1), + ParentStateRoot: c, + ParentMessageReceipts: c, + Messages: c, + }, + } + ts, err := types.NewTipSet(blks) + require.NoError(t, err) + return ts +} + +// +// All the mock methods below here are unused +// + +func (m *mockStorageMinerAPI) StateCall(ctx context.Context, message *types.Message, key types.TipSetKey) (*api.InvocResult, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerDeadlines(ctx context.Context, maddr address.Address, tok types.TipSetKey) ([]*miner.Deadline, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateSectorPreCommitInfo(ctx context.Context, address address.Address, number abi.SectorNumber, key types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateSectorGetInfo(ctx context.Context, address address.Address, number abi.SectorNumber, key types.TipSetKey) (*miner.SectorOnChainInfo, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*api.SectorLocation, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerProvingDeadline(ctx context.Context, address address.Address, key types.TipSetKey) (*dline.Info, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerPreCommitDepositForPower(ctx context.Context, address address.Address, info miner.SectorPreCommitInfo, key types.TipSetKey) (types.BigInt, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerInitialPledgeCollateral(ctx context.Context, address address.Address, info miner.SectorPreCommitInfo, key types.TipSetKey) (types.BigInt, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateSearchMsg(ctx context.Context, cid cid.Cid) (*api.MsgLookup, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateGetReceipt(ctx context.Context, cid cid.Cid, key types.TipSetKey) (*types.MessageReceipt, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMarketStorageDeal(ctx context.Context, id abi.DealID, key types.TipSetKey) (*api.MarketDeal, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerFaults(ctx context.Context, address address.Address, key types.TipSetKey) (bitfield.BitField, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateMinerRecoveries(ctx context.Context, address address.Address, key types.TipSetKey) (bitfield.BitField, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) StateAccountKey(ctx context.Context, address address.Address, key types.TipSetKey) (address.Address, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) GasEstimateMessageGas(ctx context.Context, message *types.Message, spec *api.MessageSendSpec, key types.TipSetKey) (*types.Message, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainGetBlockMessages(ctx context.Context, cid cid.Cid) (*api.BlockMessages, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainReadObj(ctx context.Context, cid cid.Cid) ([]byte, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainHasObj(ctx context.Context, cid cid.Cid) (bool, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) WalletSign(ctx context.Context, address address.Address, bytes []byte) (*crypto.Signature, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) WalletBalance(ctx context.Context, address address.Address) (types.BigInt, error) { + panic("implement me") +} + +func (m *mockStorageMinerAPI) WalletHas(ctx context.Context, address address.Address) (bool, error) { + panic("implement me") +}