From e088c71b9a3473996ddd6b52746d90b76b8f8523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 19 May 2021 20:07:20 +0200 Subject: [PATCH] marketadapter: Handle batch sealing messages --- api/test/deals.go | 2 + api/test/window_post.go | 26 +++--- .../sector-storage/ffiwrapper/sealer_test.go | 4 +- .../storageadapter/ondealsectorcommitted.go | 79 ++++++++++++++----- 4 files changed, 79 insertions(+), 32 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 7a9454bae..e3432ff0d 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -435,6 +435,8 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod require.NoError(t, miner.SectorStartSealing(ctx, snum)) } } + + flushSealingBatches(t, ctx, miner) } func testRetrieval(t *testing.T, ctx context.Context, client api.FullNode, fcid cid.Cid, piece *cid.Cid, carExport bool, data []byte) { diff --git a/api/test/window_post.go b/api/test/window_post.go index 767aff4d6..48fe3fd6c 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -201,6 +201,20 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect <-done } +func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNode) { + pcb, err := miner.SectorPreCommitFlush(ctx) + require.NoError(t, err) + if pcb != nil { + fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + } + + cb, err := miner.SectorCommitFlush(ctx) + require.NoError(t, err) + if cb != nil { + fmt.Printf("COMMIT BATCH: %s\n", *cb) + } +} + func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n, existing int, blockNotif <-chan struct{}) { for i := 0; i < n; i++ { if i%3 == 0 && blockNotif != nil { @@ -234,17 +248,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n, } for len(toCheck) > 0 { - pcb, err := miner.SectorPreCommitFlush(ctx) - require.NoError(t, err) - if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) - } - - cb, err := miner.SectorCommitFlush(ctx) - require.NoError(t, err) - if cb != nil { - fmt.Printf("COMMIT BATCH: %s\n", *cb) - } + flushSealingBatches(t, ctx, miner) for n := range toCheck { st, err := miner.SectorsStatus(ctx, n, false) diff --git a/extern/sector-storage/ffiwrapper/sealer_test.go b/extern/sector-storage/ffiwrapper/sealer_test.go index 172641bf7..df657f097 100644 --- a/extern/sector-storage/ffiwrapper/sealer_test.go +++ b/extern/sector-storage/ffiwrapper/sealer_test.go @@ -542,12 +542,12 @@ func TestSealAndVerifyAggregate(t *testing.T) { aggStart := time.Now() - avi.Proof, err = ProofVerifier.AggregateSealProofs(avi, toAggregate) + avi.Proof, err = ProofProver.AggregateSealProofs(avi, toAggregate) require.NoError(t, err) aggDone := time.Now() - _, err = ProofVerifier.AggregateSealProofs(avi, toAggregate) + _, err = ProofProver.AggregateSealProofs(avi, toAggregate) require.NoError(t, err) aggHot := time.Now() diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index b5f9c7510..f9ae201b3 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" @@ -109,7 +110,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, // Watch for a pre-commit message to the provider. matchEvent := func(msg *types.Message) (bool, error) { - matched := msg.To == provider && msg.Method == miner.Methods.PreCommitSector + matched := msg.To == provider && (msg.Method == miner.Methods.PreCommitSector || msg.Method == miner.Methods.PreCommitSectorBatch) return matched, nil } @@ -137,12 +138,6 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, return true, nil } - // Extract the message parameters - var params miner.SectorPreCommitInfo - if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { - return false, xerrors.Errorf("unmarshal pre commit: %w", err) - } - // When there is a reorg, the deal ID may change, so get the // current deal ID from the publish message CID res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid) @@ -150,13 +145,40 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, return false, err } - // Check through the deal IDs associated with this message - for _, did := range params.DealIDs { - if did == res.DealID { - // Found the deal ID in this message. Callback with the sector ID. - cb(params.SectorNumber, false, nil) - return false, nil + // Extract the message parameters + switch msg.Method { + case miner.Methods.PreCommitSector: + var params miner.SectorPreCommitInfo + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, xerrors.Errorf("unmarshal pre commit: %w", err) } + + // Check through the deal IDs associated with this message + for _, did := range params.DealIDs { + if did == res.DealID { + // Found the deal ID in this message. Callback with the sector ID. + cb(params.SectorNumber, false, nil) + return false, nil + } + } + case miner.Methods.PreCommitSectorBatch: + var params miner5.PreCommitSectorBatchParams + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, xerrors.Errorf("unmarshal pre commit: %w", err) + } + + for _, precommit := range params.Sectors { + // Check through the deal IDs associated with this message + for _, did := range precommit.DealIDs { + if did == res.DealID { + // Found the deal ID in this message. Callback with the sector ID. + cb(precommit.SectorNumber, false, nil) + return false, nil + } + } + } + default: + return false, xerrors.Errorf("unexpected method %d", msg.Method) } // Didn't find the deal ID in this message, so keep looking @@ -207,16 +229,35 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr // Match a prove-commit sent to the provider with the given sector number matchEvent := func(msg *types.Message) (matched bool, err error) { - if msg.To != provider || msg.Method != miner.Methods.ProveCommitSector { + if msg.To != provider { return false, nil } - var params miner.ProveCommitSectorParams - if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { - return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err) - } + switch msg.Method { + case miner.Methods.ProveCommitSector: + var params miner.ProveCommitSectorParams + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err) + } - return params.SectorNumber == sectorNumber, nil + return params.SectorNumber == sectorNumber, nil + + case miner.Methods.ProveCommitAggregate: + var params miner5.ProveCommitAggregateParams + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err) + } + + set, err := params.SectorNumbers.IsSet(uint64(sectorNumber)) + if err != nil { + return false, xerrors.Errorf("checking if sectorNumber is set in commit aggregate message: %w", err) + } + + return set, nil + + default: + return false, nil + } } // The deal must be accepted by the deal proposal start epoch, so timeout