Merge pull request #7173 from filecoin-project/fix/publish-test-redux
fix TestDealPublisher
This commit is contained in:
commit
0c0fd00788
@ -228,11 +228,15 @@ func (p *DealPublisher) waitForMoreDeals() {
|
|||||||
// Set a timeout to wait for more deals to arrive
|
// Set a timeout to wait for more deals to arrive
|
||||||
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
|
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
|
||||||
ctx, cancel := context.WithCancel(p.ctx)
|
ctx, cancel := context.WithCancel(p.ctx)
|
||||||
|
|
||||||
|
// Create the timer _before_ taking the current time so publishPeriod+timeout is always >=
|
||||||
|
// the actual timer timeout.
|
||||||
|
timer := build.Clock.Timer(p.publishPeriod)
|
||||||
|
|
||||||
p.publishPeriodStart = build.Clock.Now()
|
p.publishPeriodStart = build.Clock.Now()
|
||||||
p.cancelWaitForMoreDeals = cancel
|
p.cancelWaitForMoreDeals = cancel
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
timer := build.Clock.Timer(p.publishPeriod)
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
@ -257,7 +261,7 @@ func (p *DealPublisher) publishAllDeals() {
|
|||||||
|
|
||||||
// Filter out any deals that have been cancelled
|
// Filter out any deals that have been cancelled
|
||||||
p.filterCancelledDeals()
|
p.filterCancelledDeals()
|
||||||
deals := p.pending[:]
|
deals := p.pending
|
||||||
p.pending = nil
|
p.pending = nil
|
||||||
|
|
||||||
// Send the publish message
|
// Send the publish message
|
||||||
@ -384,12 +388,12 @@ func pieceCids(deals []market2.ClientDealProposal) string {
|
|||||||
|
|
||||||
// filter out deals that have been cancelled
|
// filter out deals that have been cancelled
|
||||||
func (p *DealPublisher) filterCancelledDeals() {
|
func (p *DealPublisher) filterCancelledDeals() {
|
||||||
i := 0
|
filtered := p.pending[:0]
|
||||||
for _, pd := range p.pending {
|
for _, pd := range p.pending {
|
||||||
if pd.ctx.Err() == nil {
|
if pd.ctx.Err() != nil {
|
||||||
p.pending[i] = pd
|
continue
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
|
filtered = append(filtered, pd)
|
||||||
}
|
}
|
||||||
p.pending = p.pending[:i]
|
p.pending = filtered
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user