// stm: #integration package dagstore import ( "context" "io" "testing" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" markettypes "github.com/filecoin-project/go-state-types/builtin/v8/market" "github.com/filecoin-project/lotus/node/config" ) func TestShardRegistration(t *testing.T) { ps := tut.NewTestPieceStore() sa := testnodes.NewTestSectorAccessor() ctx := context.Background() cids := tut.GenerateCids(4) pieceCidUnsealed := cids[0] pieceCidSealed := cids[1] pieceCidUnsealed2 := cids[2] pieceCidUnsealed3 := cids[3] sealedSector := abi.SectorNumber(1) unsealedSector1 := abi.SectorNumber(2) unsealedSector2 := abi.SectorNumber(3) unsealedSector3 := abi.SectorNumber(4) // ps.ExpectPiece(pieceCidUnsealed, piecestore.PieceInfo{ // PieceCID: pieceCidUnsealed, // Deals: []piecestore.DealInfo{ // { // SectorID: unsealedSector1, // }, // }, // }) // // ps.ExpectPiece(pieceCidSealed, piecestore.PieceInfo{ // PieceCID: pieceCidSealed, // Deals: []piecestore.DealInfo{ // { // SectorID: sealedSector, // }, // }, // }) deals := []storagemarket.MinerDeal{{ // Should be registered //stm: @MARKET_DAGSTORE_MIGRATE_DEALS_001 State: storagemarket.StorageDealSealing, SectorNumber: unsealedSector1, ClientDealProposal: markettypes.ClientDealProposal{ Proposal: markettypes.DealProposal{ PieceCID: pieceCidUnsealed, }, }, }, { // Should be registered with lazy registration (because sector is sealed) State: storagemarket.StorageDealSealing, SectorNumber: sealedSector, ClientDealProposal: markettypes.ClientDealProposal{ Proposal: markettypes.DealProposal{ PieceCID: pieceCidSealed, }, }, }, { // Should be ignored because deal is no longer active //stm: @MARKET_DAGSTORE_MIGRATE_DEALS_003 State: storagemarket.StorageDealError, SectorNumber: unsealedSector2, ClientDealProposal: markettypes.ClientDealProposal{ Proposal: markettypes.DealProposal{ PieceCID: pieceCidUnsealed2, }, }, }, { // Should be ignored because deal is not yet sealing State: storagemarket.StorageDealFundsReserved, SectorNumber: unsealedSector3, ClientDealProposal: markettypes.ClientDealProposal{ Proposal: markettypes.DealProposal{ PieceCID: pieceCidUnsealed3, }, }, }} cfg := config.DefaultStorageMiner().DAGStore cfg.RootDir = t.TempDir() h, err := mocknet.New().GenPeer() require.NoError(t, err) mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10, 5) dagst, w, err := NewDAGStore(cfg, mapi, h) require.NoError(t, err) require.NotNil(t, dagst) require.NotNil(t, w) err = dagst.Start(context.Background()) require.NoError(t, err) migrated, err := w.MigrateDeals(ctx, deals) require.True(t, migrated) require.NoError(t, err) //stm: @MARKET_DAGSTORE_GET_ALL_SHARDS_001 info := dagst.AllShardsInfo() require.Len(t, info, 2) for _, i := range info { require.Equal(t, dagstore.ShardStateNew, i.ShardState) } // Run register shard migration again //stm: @MARKET_DAGSTORE_MIGRATE_DEALS_002 migrated, err = w.MigrateDeals(ctx, deals) require.False(t, migrated) require.NoError(t, err) // ps.VerifyExpectations(t) } type wrappedSA struct { retrievalmarket.SectorAccessor } func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) { r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length) if err != nil { return nil, err } return struct { io.ReadCloser io.Seeker io.ReaderAt }{ ReadCloser: r, Seeker: nil, ReaderAt: nil, }, err } var _ SectorAccessor = &wrappedSA{}