diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index c2864bdf9..419bc6a04 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -204,6 +204,26 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { return } + pdealPropCid, err := pdeal.deal.Proposal.Cid() + if err != nil { + log.Warn("failed to calculate proposal CID for new pending Deal with piece cid %s", pdeal.deal.Proposal.PieceCID) + return + } + + // Sanity check that new deal isn't already in the queue + for _, pd := range p.pending { + pdPropCid, err := pd.deal.Proposal.Cid() + if err != nil { + log.Warn("failed to calculate proposal CID for pending Deal already in publish queue with piece cid %s", pd.deal.Proposal.PieceCID) + return + } + + if pdPropCid.Equals(pdealPropCid) { + log.Warn("tried to process new pending deal with piece CID %s that is already in publish queue; returning", pdeal.deal.Proposal.PieceCID) + return + } + } + // Add the new deal to the queue p.pending = append(p.pending, pdeal) log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",