diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index fcc88653e..8a824ea23 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -121,6 +121,44 @@ func newDealPublisher( } } +// PendingInfo has info about pending deals and when they are due to be +// published +type PendingInfo struct { + deals []*pendingDeal + publishPeriodStart time.Time + publishPeriod time.Duration +} + +// PendingDeals returns the list of deals that are queued up to be published +func (p *DealPublisher) PendingDeals() *PendingInfo { + p.lk.Lock() + defer p.lk.Unlock() + + // Filter out deals whose context has been cancelled + deals := make([]*pendingDeal, 0, len(p.pending)) + for _, dl := range p.pending { + if dl.ctx.Err() == nil { + deals = append(deals, dl) + } + } + + return &PendingInfo{ + deals: deals, + publishPeriodStart: p.publishPeriodStart, + publishPeriod: p.publishPeriod, + } +} + +// ForcePublishPendingDeals publishes all pending deals without waiting for +// the publish period to elapse +func (p *DealPublisher) ForcePublishPendingDeals() { + p.lk.Lock() + defer p.lk.Unlock() + + log.Infof("force publishing deals") + p.publishAllDeals() +} + func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) { pdeal := newPendingDeal(ctx, deal)