diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index 7aa331162..d834c423a 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -91,11 +91,7 @@ func TestDealPublisher(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - ctx := context.Background() - client := tutils.NewActorAddr(t, "client") - provider := tutils.NewActorAddr(t, "provider") - worker := tutils.NewActorAddr(t, "worker") - dpapi := newDPAPI(t, worker) + dpapi := newDPAPI(t) // Create a deal publisher dp := newDealPublisher(dpapi, PublishMsgConfig{ @@ -105,53 +101,17 @@ func TestDealPublisher(t *testing.T) { // Keep a record of the deals that were submitted to be published var dealsToPublish []market.ClientDealProposal - publishDeal := func(ctxCancelled bool, expired bool) { - pctx := ctx - var cancel context.CancelFunc - if ctxCancelled { - pctx, cancel = context.WithCancel(ctx) - cancel() - } - - startEpoch := abi.ChainEpoch(20) - if expired { - startEpoch = abi.ChainEpoch(5) - } - deal := market.ClientDealProposal{ - Proposal: market0.DealProposal{ - PieceCID: generateCids(1)[0], - Client: client, - Provider: provider, - StartEpoch: startEpoch, - EndEpoch: abi.ChainEpoch(120), - }, - ClientSignature: crypto.Signature{ - Type: crypto.SigTypeSecp256k1, - Data: []byte("signature data"), - }, - } - if !ctxCancelled && !expired { - dealsToPublish = append(dealsToPublish, deal) - } - go func() { - _, err := dp.Publish(pctx, deal) - if ctxCancelled || expired { - require.Error(t, err) - } else { - require.NoError(t, err) - } - }() - } // Publish deals within publish period for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { - publishDeal(false, false) + deal := publishDeal(t, dp, false, false) + dealsToPublish = append(dealsToPublish, deal) } for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { - publishDeal(true, false) + publishDeal(t, dp, true, false) } for i := 0; i < tc.expiredDeals; i++ { - publishDeal(false, true) + publishDeal(t, dp, false, true) } // Wait until publish period has elapsed @@ -159,41 +119,130 @@ func TestDealPublisher(t *testing.T) { // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { - publishDeal(false, false) + deal := publishDeal(t, dp, false, false) + dealsToPublish = append(dealsToPublish, deal) } - // For each message that was expected to be sent - var publishedDeals []market.ClientDealProposal - for _, expectedDealsInMsg := range tc.expectedDealsPerMsg { - // Should have called StateMinerInfo with the provider address - stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls - require.Equal(t, provider, stateMinerInfoAddr) - - // Check the fields of the message that was sent - msg := <-dpapi.pushedMsgs - require.Equal(t, worker, msg.From) - require.Equal(t, market.Address, msg.To) - require.Equal(t, market.Methods.PublishStorageDeals, msg.Method) - - // Check that the expected number of deals was included in the message - var params market2.PublishStorageDealsParams - err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) - require.NoError(t, err) - require.Len(t, params.Deals, expectedDealsInMsg) - - // Keep track of the deals that were sent - for _, d := range params.Deals { - publishedDeals = append(publishedDeals, d) - } - } - - // Verify that all deals that were submitted to be published were - // sent out (we do this by ensuring all the piece CIDs are present) - require.True(t, matchPieceCids(publishedDeals, dealsToPublish)) + checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg) }) } } +func TestForcePublish(t *testing.T) { + dpapi := newDPAPI(t) + + // Create a deal publisher + start := time.Now() + publishPeriod := time.Hour + dp := newDealPublisher(dpapi, PublishMsgConfig{ + Period: publishPeriod, + MaxDealsPerMsg: 10, + }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) + + // Queue three deals for publishing, one with a cancelled context + var dealsToPublish []market.ClientDealProposal + // 1. Regular deal + deal := publishDeal(t, dp, false, false) + dealsToPublish = append(dealsToPublish, deal) + // 2. Deal with cancelled context + publishDeal(t, dp, true, false) + // 3. Regular deal + deal = publishDeal(t, dp, false, false) + dealsToPublish = append(dealsToPublish, deal) + + // Allow a moment for them to be queued + time.Sleep(10 * time.Millisecond) + + // Should be two deals in the pending deals list + // (deal with cancelled context is ignored) + pendingInfo := dp.PendingDeals() + require.Len(t, pendingInfo.deals, 2) + require.Equal(t, publishPeriod, pendingInfo.publishPeriod) + require.True(t, pendingInfo.publishPeriodStart.After(start)) + require.True(t, pendingInfo.publishPeriodStart.Before(time.Now())) + + // Force publish all pending deals + dp.ForcePublishPendingDeals() + + // Should be no pending deals + pendingInfo = dp.PendingDeals() + require.Len(t, pendingInfo.deals, 0) + + // Make sure the expected deals were published + checkPublishedDeals(t, dpapi, dealsToPublish, []int{2}) +} + +func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired bool) market.ClientDealProposal { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pctx := ctx + if ctxCancelled { + pctx, cancel = context.WithCancel(ctx) + cancel() + } + + startEpoch := abi.ChainEpoch(20) + if expired { + startEpoch = abi.ChainEpoch(5) + } + deal := market.ClientDealProposal{ + Proposal: market0.DealProposal{ + PieceCID: generateCids(1)[0], + Client: getClientActor(t), + Provider: getProviderActor(t), + StartEpoch: startEpoch, + EndEpoch: abi.ChainEpoch(120), + }, + ClientSignature: crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte("signature data"), + }, + } + + go func() { + _, err := dp.Publish(pctx, deal) + if ctxCancelled || expired { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }() + + return deal +} + +func checkPublishedDeals(t *testing.T, dpapi *dpAPI, dealsToPublish []market.ClientDealProposal, expectedDealsPerMsg []int) { + // For each message that was expected to be sent + var publishedDeals []market.ClientDealProposal + for _, expectedDealsInMsg := range expectedDealsPerMsg { + // Should have called StateMinerInfo with the provider address + stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls + require.Equal(t, getProviderActor(t), stateMinerInfoAddr) + + // Check the fields of the message that was sent + msg := <-dpapi.pushedMsgs + require.Equal(t, getWorkerActor(t), msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, market.Methods.PublishStorageDeals, msg.Method) + + // Check that the expected number of deals was included in the message + var params market2.PublishStorageDealsParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, params.Deals, expectedDealsInMsg) + + // Keep track of the deals that were sent + for _, d := range params.Deals { + publishedDeals = append(publishedDeals, d) + } + } + + // Verify that all deals that were submitted to be published were + // sent out (we do this by ensuring all the piece CIDs are present) + require.True(t, matchPieceCids(publishedDeals, dealsToPublish)) +} + func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { cidsA := dealPieceCids(sent) cidsB := dealPieceCids(exp) @@ -232,10 +281,10 @@ type dpAPI struct { pushedMsgs chan *types.Message } -func newDPAPI(t *testing.T, worker address.Address) *dpAPI { +func newDPAPI(t *testing.T) *dpAPI { return &dpAPI{ t: t, - worker: worker, + worker: getWorkerActor(t), stateMinerInfoCalls: make(chan address.Address, 128), pushedMsgs: make(chan *types.Message, 128), } @@ -264,3 +313,15 @@ func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec * d.pushedMsgs <- msg return &types.SignedMessage{Message: *msg}, nil } + +func getClientActor(t *testing.T) address.Address { + return tutils.NewActorAddr(t, "client") +} + +func getWorkerActor(t *testing.T) address.Address { + return tutils.NewActorAddr(t, "worker") +} + +func getProviderActor(t *testing.T) address.Address { + return tutils.NewActorAddr(t, "provider") +}