diff --git a/api/api_storage.go b/api/api_storage.go index cebb6d5f0..00cf3400d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -73,6 +73,7 @@ type StorageMiner interface { MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) + MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index a441453ad..6481223da 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -218,6 +218,7 @@ type StorageMinerStruct struct { MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"` + MarketGetDealUpdates func(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"` MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` @@ -1005,6 +1006,10 @@ func (c *StorageMinerStruct) MarketListRetrievalDeals(ctx context.Context) ([]re return c.Internal.MarketListRetrievalDeals(ctx) } +func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { + return c.Internal.MarketGetDealUpdates(ctx, d) +} + func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) { return c.Internal.MarketListIncompleteDeals(ctx) } diff --git a/api/test/deals.go b/api/test/deals.go index 1deb424ce..37ff780f7 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -144,6 +144,61 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data) } +func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { + _ = os.Setenv("BELLMAN_NO_GPU", "1") + + ctx := context.Background() + n, sn := b(t, 1, oneMiner) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + + addrinfo, err := client.NetAddrsListen(ctx) + if err != nil { + t.Fatal(err) + } + + if err := miner.NetConnect(ctx, addrinfo); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + mine := int64(1) + done := make(chan struct{}) + go func() { + defer close(done) + for atomic.LoadInt64(&mine) == 1 { + time.Sleep(blocktime) + if err := sn[0].MineOne(ctx, MineNext); err != nil { + t.Error(err) + } + } + }() + + data := make([]byte, 1600) + rand.New(rand.NewSource(int64(8))).Read(data) + + r := bytes.NewReader(data) + fcid, err := client.ClientImportLocal(ctx, r) + if err != nil { + t.Fatal(err) + } + + fmt.Println("FILE CID: ", fcid) + + deal := startDeal(t, ctx, miner, client, fcid, true) + + waitDealPublished(t, ctx, miner, deal) + fmt.Println("deal published, retrieving") + // Retrieval + info, err := client.ClientGetDealInfo(ctx, *deal) + require.NoError(t, err) + + testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, false, data) + atomic.AddInt64(&mine, -1) + fmt.Println("shutting down mining") + <-done +} + func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { _ = os.Setenv("BELLMAN_NO_GPU", "1") @@ -275,6 +330,34 @@ loop: } } +func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) { + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + updates, err := miner.MarketGetDealUpdates(subCtx, *deal) + if err != nil { + t.Fatal(err) + } + for { + select { + case <-ctx.Done(): + t.Fatal("context timeout") + case di := <-updates: + switch di.State { + case storagemarket.StorageDealProposalRejected: + t.Fatal("deal rejected") + case storagemarket.StorageDealFailing: + t.Fatal("deal failed") + case storagemarket.StorageDealError: + t.Fatal("deal errored", di.Message) + case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive: + fmt.Println("COMPLETE", di) + return + } + fmt.Println("Deal state: ", storagemarket.DealStates[di.State]) + } + } +} + func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNode) { snums, err := miner.SectorsList(ctx) require.NoError(t, err) diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 6b2383c13..211f10500 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/sector-storage/storiface" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/paych" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" @@ -57,7 +58,11 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi r, w := io.Pipe() go func() { - err := rpn.sealer.ReadPiece(ctx, w, sid, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, *si.CommD) + var commD cid.Cid + if si.CommD != nil { + commD = *si.CommD + } + err := rpn.sealer.ReadPiece(ctx, w, sid, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD) _ = w.CloseWithError(err) }() diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 362a5988f..be1f4e0fd 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -298,6 +298,24 @@ func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retr return out, nil } +func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { + results := make(chan storagemarket.MinerDeal) + unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + if deal.ProposalCid.Equals(d) { + select { + case results <- deal: + case <-ctx.Done(): + } + } + }) + go func() { + <-ctx.Done() + unsub() + close(results) + }() + return results, nil +} + func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) { return sm.StorageProvider.ListLocalDeals() } diff --git a/node/node_test.go b/node/node_test.go index 76c6546ca..5d50c73dd 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -511,6 +511,9 @@ func TestAPIDealFlow(t *testing.T) { t.Run("TestDoubleDealFlow", func(t *testing.T) { test.TestDoubleDealFlow(t, mockSbBuilder, 10*time.Millisecond) }) + t.Run("TestFastRetrievalDealFlow", func(t *testing.T) { + test.TestFastRetrievalDealFlow(t, mockSbBuilder, 10*time.Millisecond) + }) } func TestAPIDealFlowReal(t *testing.T) {