test: add test for pending publish deals / force publish

This commit is contained in:
Dirk McCormick 2021-02-05 17:09:57 +01:00
parent 4676fd6753
commit 01e30e0665

View File

@ -91,11 +91,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx := context.Background() dpapi := newDPAPI(t)
client := tutils.NewActorAddr(t, "client")
provider := tutils.NewActorAddr(t, "provider")
worker := tutils.NewActorAddr(t, "worker")
dpapi := newDPAPI(t, worker)
// Create a deal publisher // Create a deal publisher
dp := newDealPublisher(dpapi, PublishMsgConfig{ dp := newDealPublisher(dpapi, PublishMsgConfig{
@ -105,53 +101,17 @@ func TestDealPublisher(t *testing.T) {
// Keep a record of the deals that were submitted to be published // Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal var dealsToPublish []market.ClientDealProposal
publishDeal := func(ctxCancelled bool, expired bool) {
pctx := ctx
var cancel context.CancelFunc
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
if !ctxCancelled && !expired {
dealsToPublish = append(dealsToPublish, deal)
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}()
}
// Publish deals within publish period // Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false, false) deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
} }
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true, false) publishDeal(t, dp, true, false)
} }
for i := 0; i < tc.expiredDeals; i++ { for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true) publishDeal(t, dp, false, true)
} }
// Wait until publish period has elapsed // Wait until publish period has elapsed
@ -159,41 +119,130 @@ func TestDealPublisher(t *testing.T) {
// Publish deals after publish period // Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false, false) deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
} }
// For each message that was expected to be sent checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
var publishedDeals []market.ClientDealProposal
for _, expectedDealsInMsg := range tc.expectedDealsPerMsg {
// Should have called StateMinerInfo with the provider address
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
require.Equal(t, provider, stateMinerInfoAddr)
// Check the fields of the message that was sent
msg := <-dpapi.pushedMsgs
require.Equal(t, worker, msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
// Check that the expected number of deals was included in the message
var params market2.PublishStorageDealsParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, params.Deals, expectedDealsInMsg)
// Keep track of the deals that were sent
for _, d := range params.Deals {
publishedDeals = append(publishedDeals, d)
}
}
// Verify that all deals that were submitted to be published were
// sent out (we do this by ensuring all the piece CIDs are present)
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
}) })
} }
} }
func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)
// Create a deal publisher
start := time.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: 10,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
// Queue three deals for publishing, one with a cancelled context
var dealsToPublish []market.ClientDealProposal
// 1. Regular deal
deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
// 2. Deal with cancelled context
publishDeal(t, dp, true, false)
// 3. Regular deal
deal = publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.deals, 2)
require.Equal(t, publishPeriod, pendingInfo.publishPeriod)
require.True(t, pendingInfo.publishPeriodStart.After(start))
require.True(t, pendingInfo.publishPeriodStart.Before(time.Now()))
// Force publish all pending deals
dp.ForcePublishPendingDeals()
// Should be no pending deals
pendingInfo = dp.PendingDeals()
require.Len(t, pendingInfo.deals, 0)
// Make sure the expected deals were published
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
}
func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired bool) market.ClientDealProposal {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
pctx := ctx
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: getClientActor(t),
Provider: getProviderActor(t),
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}()
return deal
}
func checkPublishedDeals(t *testing.T, dpapi *dpAPI, dealsToPublish []market.ClientDealProposal, expectedDealsPerMsg []int) {
// For each message that was expected to be sent
var publishedDeals []market.ClientDealProposal
for _, expectedDealsInMsg := range expectedDealsPerMsg {
// Should have called StateMinerInfo with the provider address
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
require.Equal(t, getProviderActor(t), stateMinerInfoAddr)
// Check the fields of the message that was sent
msg := <-dpapi.pushedMsgs
require.Equal(t, getWorkerActor(t), msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
// Check that the expected number of deals was included in the message
var params market2.PublishStorageDealsParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, params.Deals, expectedDealsInMsg)
// Keep track of the deals that were sent
for _, d := range params.Deals {
publishedDeals = append(publishedDeals, d)
}
}
// Verify that all deals that were submitted to be published were
// sent out (we do this by ensuring all the piece CIDs are present)
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
}
func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool {
cidsA := dealPieceCids(sent) cidsA := dealPieceCids(sent)
cidsB := dealPieceCids(exp) cidsB := dealPieceCids(exp)
@ -232,10 +281,10 @@ type dpAPI struct {
pushedMsgs chan *types.Message pushedMsgs chan *types.Message
} }
func newDPAPI(t *testing.T, worker address.Address) *dpAPI { func newDPAPI(t *testing.T) *dpAPI {
return &dpAPI{ return &dpAPI{
t: t, t: t,
worker: worker, worker: getWorkerActor(t),
stateMinerInfoCalls: make(chan address.Address, 128), stateMinerInfoCalls: make(chan address.Address, 128),
pushedMsgs: make(chan *types.Message, 128), pushedMsgs: make(chan *types.Message, 128),
} }
@ -264,3 +313,15 @@ func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *
d.pushedMsgs <- msg d.pushedMsgs <- msg
return &types.SignedMessage{Message: *msg}, nil return &types.SignedMessage{Message: *msg}, nil
} }
func getClientActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "client")
}
func getWorkerActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "worker")
}
func getProviderActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "provider")
}