From 3ced11c31f9b9eaf22383d5aeac6cf29e2b9704e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 2 Feb 2021 10:22:12 +0100 Subject: [PATCH] fix: dont include expired deals in batch publish message --- markets/storageadapter/dealpublisher.go | 59 +++++++++++--- markets/storageadapter/dealpublisher_test.go | 84 ++++++++++++++------ 2 files changed, 105 insertions(+), 38 deletions(-) diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index e197479d6..fcc88653e 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -25,6 +25,7 @@ import ( ) type dealPublisherAPI interface { + ChainHead(context.Context) (*types.TipSet, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) } @@ -223,8 +224,33 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { return } + // onComplete is called when the publish message has been sent or there + // was an error + onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) { + // Send the publish result on the pending deal's Result channel + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case <-pd.ctx.Done(): + case pd.Result <- res: + } + } + + // Validate each deal to make sure it can be published + validated := make([]*pendingDeal, 0, len(ready)) deals := make([]market2.ClientDealProposal, 0, len(ready)) for _, pd := range ready { + // Validate the deal + if err := p.validateDeal(pd.deal); err != nil { + // Validation failed, complete immediately with an error + go onComplete(pd, cid.Undef, err) + continue + } + + validated = append(validated, pd) deals = append(deals, pd.deal) } @@ -232,23 +258,32 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { msgCid, err := p.publishDealProposals(deals) // Signal that each deal has been published - for _, pd := range ready { - pd := pd - go func() { - res := publishResult{ - msgCid: msgCid, - err: err, - } - select { - case <-p.ctx.Done(): - case pd.Result <- res: - } - }() + for _, pd := range validated { + go onComplete(pd, msgCid, err) } } +// validateDeal checks that the deal proposal start epoch hasn't already +// elapsed +func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { + head, err := p.api.ChainHead(p.ctx) + if err != nil { + return err + } + if head.Height() > deal.Proposal.StartEpoch { + return xerrors.Errorf( + "cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", + deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch) + } + return nil +} + // Sends the publish message func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { + if len(deals) == 0 { + return cid.Undef, nil + } + log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) provider := deals[0].Proposal.Provider diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index bf9fac6a6..7aa331162 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -26,13 +26,14 @@ import ( func TestDealPublisher(t *testing.T) { testCases := []struct { - name string - publishPeriod time.Duration - maxDealsPerMsg uint64 - dealCountWithinPublishPeriod int - expiredWithinPublishPeriod int - dealCountAfterPublishPeriod int - expectedDealsPerMsg []int + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + ctxCancelledWithinPublishPeriod int + expiredDeals int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int }{{ name: "publish one deal within publish period", publishPeriod: 10 * time.Millisecond, @@ -61,22 +62,30 @@ func TestDealPublisher(t *testing.T) { dealCountWithinPublishPeriod: 3, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1, 1}, + }, { + name: "ignore deals with cancelled context", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, }, { name: "ignore expired deals", publishPeriod: 10 * time.Millisecond, maxDealsPerMsg: 5, dealCountWithinPublishPeriod: 2, - expiredWithinPublishPeriod: 2, + expiredDeals: 2, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1}, }, { - name: "zero config", - publishPeriod: 0, - maxDealsPerMsg: 0, - dealCountWithinPublishPeriod: 2, - expiredWithinPublishPeriod: 0, - dealCountAfterPublishPeriod: 2, - expectedDealsPerMsg: []int{1, 1, 1, 1}, + name: "zero config", + publishPeriod: 0, + maxDealsPerMsg: 0, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 0, + dealCountAfterPublishPeriod: 2, + expectedDealsPerMsg: []int{1, 1, 1, 1}, }} for _, tc := range testCases { @@ -96,31 +105,37 @@ func TestDealPublisher(t *testing.T) { // Keep a record of the deals that were submitted to be published var dealsToPublish []market.ClientDealProposal - publishDeal := func(expired bool) { + publishDeal := func(ctxCancelled bool, expired bool) { pctx := ctx var cancel context.CancelFunc - if expired { + if ctxCancelled { pctx, cancel = context.WithCancel(ctx) cancel() } + startEpoch := abi.ChainEpoch(20) + if expired { + startEpoch = abi.ChainEpoch(5) + } deal := market.ClientDealProposal{ Proposal: market0.DealProposal{ - PieceCID: generateCids(1)[0], - Client: client, - Provider: provider, + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, + StartEpoch: startEpoch, + EndEpoch: abi.ChainEpoch(120), }, ClientSignature: crypto.Signature{ Type: crypto.SigTypeSecp256k1, Data: []byte("signature data"), }, } - if !expired { + if !ctxCancelled && !expired { dealsToPublish = append(dealsToPublish, deal) } go func() { _, err := dp.Publish(pctx, deal) - if expired { + if ctxCancelled || expired { require.Error(t, err) } else { require.NoError(t, err) @@ -130,10 +145,13 @@ func TestDealPublisher(t *testing.T) { // Publish deals within publish period for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { - publishDeal(false) + publishDeal(false, false) } - for i := 0; i < tc.expiredWithinPublishPeriod; i++ { - publishDeal(true) + for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { + publishDeal(true, false) + } + for i := 0; i < tc.expiredDeals; i++ { + publishDeal(false, true) } // Wait until publish period has elapsed @@ -141,7 +159,7 @@ func TestDealPublisher(t *testing.T) { // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { - publishDeal(false) + publishDeal(false, false) } // For each message that was expected to be sent @@ -223,6 +241,20 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI { } } +func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { + dummyCid, err := cid.Parse("bafkqaaa") + require.NoError(d.t, err) + return types.NewTipSet([]*types.BlockHeader{{ + Miner: tutils.NewActorAddr(d.t, "miner"), + Height: abi.ChainEpoch(10), + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }}) +} + func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { d.stateMinerInfoCalls <- address return miner.MinerInfo{Worker: d.worker}, nil