diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index 9f7ba1629..7b1ca1793 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -17,6 +17,7 @@ import ( market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -203,9 +204,9 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)", pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg) - // If the maximum number of deals per message has been reached, - // send a publish message - if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg { + // If the maximum number of deals per message has been reached or we're not batching, send a + // publish message + if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 { log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg) p.publishAllDeals() return @@ -218,7 +219,7 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { func (p *DealPublisher) waitForMoreDeals() { // Check if we're already waiting for deals if !p.publishPeriodStart.IsZero() { - elapsed := time.Since(p.publishPeriodStart) + elapsed := build.Clock.Since(p.publishPeriodStart) log.Infof("%s elapsed of / %s until publish deals queue is published", elapsed, p.publishPeriod) return @@ -227,11 +228,11 @@ func (p *DealPublisher) waitForMoreDeals() { // Set a timeout to wait for more deals to arrive log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod) ctx, cancel := context.WithCancel(p.ctx) - p.publishPeriodStart = time.Now() + p.publishPeriodStart = build.Clock.Now() p.cancelWaitForMoreDeals = cancel go func() { - timer := time.NewTimer(p.publishPeriod) + timer := build.Clock.Timer(p.publishPeriod) select { case <-ctx.Done(): timer.Stop() diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index b2f107bf4..a4991396a 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -9,12 +9,14 @@ import ( "github.com/filecoin-project/go-state-types/crypto" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/ipfs/go-cid" + "github.com/raulk/clock" "github.com/stretchr/testify/require" tutils "github.com/filecoin-project/specs-actors/v2/support/testing" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" @@ -25,7 +27,11 @@ import ( ) func TestDealPublisher(t *testing.T) { - t.Skip("this test randomly fails in various subtests; see issue #6799") + oldClock := build.Clock + t.Cleanup(func() { build.Clock = oldClock }) + mc := clock.NewMock() + build.Clock = mc + testCases := []struct { name string publishPeriod time.Duration @@ -92,6 +98,7 @@ func TestDealPublisher(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { + mc.Set(time.Now()) dpapi := newDPAPI(t) // Create a deal publisher @@ -116,7 +123,31 @@ func TestDealPublisher(t *testing.T) { } // Wait until publish period has elapsed - time.Sleep(2 * tc.publishPeriod) + if tc.publishPeriod > 0 { + // If we expect deals to get stuck in the queue, wait until that happens + if tc.maxDealsPerMsg != 0 && tc.dealCountWithinPublishPeriod%int(tc.maxDealsPerMsg) != 0 { + require.Eventually(t, func() bool { + dp.lk.Lock() + defer dp.lk.Unlock() + return !dp.publishPeriodStart.IsZero() + }, time.Second, time.Millisecond, "failed to queue deals") + } + + // Then wait to send + require.Eventually(t, func() bool { + dp.lk.Lock() + defer dp.lk.Unlock() + + // Advance if necessary. + if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod { + dp.lk.Unlock() + mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1)) + dp.lk.Lock() + } + + return len(dp.pending) == 0 + }, time.Second, time.Millisecond, "failed to send pending messages") + } // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { @@ -124,6 +155,19 @@ func TestDealPublisher(t *testing.T) { dealsToPublish = append(dealsToPublish, deal) } + if tc.publishPeriod > 0 && tc.dealCountAfterPublishPeriod > 0 { + require.Eventually(t, func() bool { + dp.lk.Lock() + defer dp.lk.Unlock() + if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod { + dp.lk.Unlock() + mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1)) + dp.lk.Lock() + } + return len(dp.pending) == 0 + }, time.Second, time.Millisecond, "failed to send pending messages") + } + checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg) }) } @@ -133,7 +177,7 @@ func TestForcePublish(t *testing.T) { dpapi := newDPAPI(t) // Create a deal publisher - start := time.Now() + start := build.Clock.Now() publishPeriod := time.Hour dp := newDealPublisher(dpapi, nil, PublishMsgConfig{ Period: publishPeriod, @@ -152,7 +196,7 @@ func TestForcePublish(t *testing.T) { dealsToPublish = append(dealsToPublish, deal) // Allow a moment for them to be queued - time.Sleep(10 * time.Millisecond) + build.Clock.Sleep(10 * time.Millisecond) // Should be two deals in the pending deals list // (deal with cancelled context is ignored) @@ -160,7 +204,7 @@ func TestForcePublish(t *testing.T) { require.Len(t, pendingInfo.Deals, 2) require.Equal(t, publishPeriod, pendingInfo.PublishPeriod) require.True(t, pendingInfo.PublishPeriodStart.After(start)) - require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now())) + require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now())) // Force publish all pending deals dp.ForcePublishPendingDeals() diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index b899c0810..9756b33d5 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -107,8 +107,8 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema } p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo) - curTime := time.Now() - for time.Since(curTime) < addPieceRetryTimeout { + curTime := build.Clock.Now() + for build.Clock.Since(curTime) < addPieceRetryTimeout { if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) { if err != nil { log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err) @@ -116,7 +116,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema break } select { - case <-time.After(addPieceRetryWait): + case <-build.Clock.After(addPieceRetryWait): p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo) case <-ctx.Done(): return nil, xerrors.New("context expired while waiting to retry AddPiece")