fix: dont include expired deals in batch publish message

This commit is contained in:
Dirk McCormick 2021-02-02 10:22:12 +01:00
parent 987f41011a
commit 3ced11c31f
2 changed files with 105 additions and 38 deletions

View File

@ -25,6 +25,7 @@ import (
) )
type dealPublisherAPI interface { type dealPublisherAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
} }
@ -223,8 +224,33 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
return return
} }
// onComplete is called when the publish message has been sent or there
// was an error
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
// Send the publish result on the pending deal's Result channel
res := publishResult{
msgCid: msgCid,
err: err,
}
select {
case <-p.ctx.Done():
case <-pd.ctx.Done():
case pd.Result <- res:
}
}
// Validate each deal to make sure it can be published
validated := make([]*pendingDeal, 0, len(ready))
deals := make([]market2.ClientDealProposal, 0, len(ready)) deals := make([]market2.ClientDealProposal, 0, len(ready))
for _, pd := range ready { for _, pd := range ready {
// Validate the deal
if err := p.validateDeal(pd.deal); err != nil {
// Validation failed, complete immediately with an error
go onComplete(pd, cid.Undef, err)
continue
}
validated = append(validated, pd)
deals = append(deals, pd.deal) deals = append(deals, pd.deal)
} }
@ -232,23 +258,32 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
msgCid, err := p.publishDealProposals(deals) msgCid, err := p.publishDealProposals(deals)
// Signal that each deal has been published // Signal that each deal has been published
for _, pd := range ready { for _, pd := range validated {
pd := pd go onComplete(pd, msgCid, err)
go func() {
res := publishResult{
msgCid: msgCid,
err: err,
} }
select { }
case <-p.ctx.Done():
case pd.Result <- res: // validateDeal checks that the deal proposal start epoch hasn't already
// elapsed
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
head, err := p.api.ChainHead(p.ctx)
if err != nil {
return err
} }
}() if head.Height() > deal.Proposal.StartEpoch {
return xerrors.Errorf(
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
} }
return nil
} }
// Sends the publish message // Sends the publish message
func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) {
if len(deals) == 0 {
return cid.Undef, nil
}
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
provider := deals[0].Proposal.Provider provider := deals[0].Proposal.Provider

View File

@ -30,7 +30,8 @@ func TestDealPublisher(t *testing.T) {
publishPeriod time.Duration publishPeriod time.Duration
maxDealsPerMsg uint64 maxDealsPerMsg uint64
dealCountWithinPublishPeriod int dealCountWithinPublishPeriod int
expiredWithinPublishPeriod int ctxCancelledWithinPublishPeriod int
expiredDeals int
dealCountAfterPublishPeriod int dealCountAfterPublishPeriod int
expectedDealsPerMsg []int expectedDealsPerMsg []int
}{{ }{{
@ -61,12 +62,20 @@ func TestDealPublisher(t *testing.T) {
dealCountWithinPublishPeriod: 3, dealCountWithinPublishPeriod: 3,
dealCountAfterPublishPeriod: 1, dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1, 1}, expectedDealsPerMsg: []int{2, 1, 1},
}, {
name: "ignore deals with cancelled context",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, { }, {
name: "ignore expired deals", name: "ignore expired deals",
publishPeriod: 10 * time.Millisecond, publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5, maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2, dealCountWithinPublishPeriod: 2,
expiredWithinPublishPeriod: 2, expiredDeals: 2,
dealCountAfterPublishPeriod: 1, dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1}, expectedDealsPerMsg: []int{2, 1},
}, { }, {
@ -74,7 +83,7 @@ func TestDealPublisher(t *testing.T) {
publishPeriod: 0, publishPeriod: 0,
maxDealsPerMsg: 0, maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2, dealCountWithinPublishPeriod: 2,
expiredWithinPublishPeriod: 0, ctxCancelledWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2, dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1}, expectedDealsPerMsg: []int{1, 1, 1, 1},
}} }}
@ -96,31 +105,37 @@ func TestDealPublisher(t *testing.T) {
// Keep a record of the deals that were submitted to be published // Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal var dealsToPublish []market.ClientDealProposal
publishDeal := func(expired bool) { publishDeal := func(ctxCancelled bool, expired bool) {
pctx := ctx pctx := ctx
var cancel context.CancelFunc var cancel context.CancelFunc
if expired { if ctxCancelled {
pctx, cancel = context.WithCancel(ctx) pctx, cancel = context.WithCancel(ctx)
cancel() cancel()
} }
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{ deal := market.ClientDealProposal{
Proposal: market0.DealProposal{ Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0], PieceCID: generateCids(1)[0],
Client: client, Client: client,
Provider: provider, Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
}, },
ClientSignature: crypto.Signature{ ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1, Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"), Data: []byte("signature data"),
}, },
} }
if !expired { if !ctxCancelled && !expired {
dealsToPublish = append(dealsToPublish, deal) dealsToPublish = append(dealsToPublish, deal)
} }
go func() { go func() {
_, err := dp.Publish(pctx, deal) _, err := dp.Publish(pctx, deal)
if expired { if ctxCancelled || expired {
require.Error(t, err) require.Error(t, err)
} else { } else {
require.NoError(t, err) require.NoError(t, err)
@ -130,10 +145,13 @@ func TestDealPublisher(t *testing.T) {
// Publish deals within publish period // Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false) publishDeal(false, false)
} }
for i := 0; i < tc.expiredWithinPublishPeriod; i++ { for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true) publishDeal(true, false)
}
for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true)
} }
// Wait until publish period has elapsed // Wait until publish period has elapsed
@ -141,7 +159,7 @@ func TestDealPublisher(t *testing.T) {
// Publish deals after publish period // Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false) publishDeal(false, false)
} }
// For each message that was expected to be sent // For each message that was expected to be sent
@ -223,6 +241,20 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI {
} }
} }
func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
dummyCid, err := cid.Parse("bafkqaaa")
require.NoError(d.t, err)
return types.NewTipSet([]*types.BlockHeader{{
Miner: tutils.NewActorAddr(d.t, "miner"),
Height: abi.ChainEpoch(10),
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
}
func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) {
d.stateMinerInfoCalls <- address d.stateMinerInfoCalls <- address
return miner.MinerInfo{Worker: d.worker}, nil return miner.MinerInfo{Worker: d.worker}, nil