From 7bd0fcbb24294ebccaaa061a001f7ce95519039c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 25 May 2021 16:47:42 +0200 Subject: [PATCH] sealing: Don't start batch timers with empty batches --- extern/storage-sealing/commit_batch.go | 14 +++++++++----- extern/storage-sealing/precommit_batch.go | 14 +++++++++----- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 53f572712..88c8bdf37 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -105,7 +105,7 @@ func (b *CommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-time.After(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)): + case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): sendAboveMin = true case fr := <-b.force: // user triggered forceRes = fr @@ -119,12 +119,16 @@ func (b *CommitBatcher) run() { } } -func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { +func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { now := time.Now() b.lk.Lock() defer b.lk.Unlock() + if len(b.todo) == 0 { + return nil + } + var deadline time.Time for sn := range b.todo { sectorDeadline := b.deadlines[sn] @@ -140,12 +144,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { } if deadline.IsZero() { - return maxWait + return time.After(maxWait) } deadline = deadline.Add(-slack) if deadline.Before(now) { - return time.Nanosecond // can't return 0 + return time.After(time.Nanosecond) // can't return 0 } wait := deadline.Sub(now) @@ -153,7 +157,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { wait = maxWait } - return wait + return time.After(wait) } func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index f16e5c5ee..69ba47c60 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -95,7 +95,7 @@ func (b *PreCommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-time.After(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)): + case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): sendAboveMin = true case fr := <-b.force: // user triggered forceRes = fr @@ -109,12 +109,16 @@ func (b *PreCommitBatcher) run() { } } -func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { +func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { now := time.Now() b.lk.Lock() defer b.lk.Unlock() + if len(b.todo) == 0 { + return nil + } + var deadline time.Time for sn := range b.todo { sectorDeadline := b.deadlines[sn] @@ -130,12 +134,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration } if deadline.IsZero() { - return maxWait + return time.After(maxWait) } deadline = deadline.Add(-slack) if deadline.Before(now) { - return time.Nanosecond // can't return 0 + return time.After(time.Nanosecond) // can't return 0 } wait := deadline.Sub(now) @@ -143,7 +147,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration wait = maxWait } - return wait + return time.After(wait) } func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {