fix flaky TestDealPublisher and re-enable

fixes #6799
This commit is contained in:
Steven Allen 2021-08-05 19:23:29 -07:00
parent 9dca65634d
commit be2ecf6236
3 changed files with 59 additions and 14 deletions

View File

@ -17,6 +17,7 @@ import (
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -203,9 +204,9 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)
// If the maximum number of deals per message has been reached,
// send a publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
p.publishAllDeals()
return
@ -218,7 +219,7 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
func (p *DealPublisher) waitForMoreDeals() {
// Check if we're already waiting for deals
if !p.publishPeriodStart.IsZero() {
elapsed := time.Since(p.publishPeriodStart)
elapsed := build.Clock.Since(p.publishPeriodStart)
log.Infof("%s elapsed of / %s until publish deals queue is published",
elapsed, p.publishPeriod)
return
@ -227,11 +228,11 @@ func (p *DealPublisher) waitForMoreDeals() {
// Set a timeout to wait for more deals to arrive
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
ctx, cancel := context.WithCancel(p.ctx)
p.publishPeriodStart = time.Now()
p.publishPeriodStart = build.Clock.Now()
p.cancelWaitForMoreDeals = cancel
go func() {
timer := time.NewTimer(p.publishPeriod)
timer := build.Clock.Timer(p.publishPeriod)
select {
case <-ctx.Done():
timer.Stop()

View File

@ -9,12 +9,14 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
@ -25,7 +27,11 @@ import (
)
func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
oldClock := build.Clock
t.Cleanup(func() { build.Clock = oldClock })
mc := clock.NewMock()
build.Clock = mc
testCases := []struct {
name string
publishPeriod time.Duration
@ -92,6 +98,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc.Set(time.Now())
dpapi := newDPAPI(t)
// Create a deal publisher
@ -116,7 +123,31 @@ func TestDealPublisher(t *testing.T) {
}
// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)
if tc.publishPeriod > 0 {
// If we expect deals to get stuck in the queue, wait until that happens
if tc.maxDealsPerMsg != 0 && tc.dealCountWithinPublishPeriod%int(tc.maxDealsPerMsg) != 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
return !dp.publishPeriodStart.IsZero()
}, time.Second, time.Millisecond, "failed to queue deals")
}
// Then wait to send
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
// Advance if necessary.
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}
// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
@ -124,6 +155,19 @@ func TestDealPublisher(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)
}
if tc.publishPeriod > 0 && tc.dealCountAfterPublishPeriod > 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}
checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
})
}
@ -133,7 +177,7 @@ func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)
// Create a deal publisher
start := time.Now()
start := build.Clock.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
@ -152,7 +196,7 @@ func TestForcePublish(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)
// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
build.Clock.Sleep(10 * time.Millisecond)
// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
@ -160,7 +204,7 @@ func TestForcePublish(t *testing.T) {
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now()))
// Force publish all pending deals
dp.ForcePublishPendingDeals()

View File

@ -107,8 +107,8 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
}
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
curTime := time.Now()
for time.Since(curTime) < addPieceRetryTimeout {
curTime := build.Clock.Now()
for build.Clock.Since(curTime) < addPieceRetryTimeout {
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
if err != nil {
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
@ -116,7 +116,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
break
}
select {
case <-time.After(addPieceRetryWait):
case <-build.Clock.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")