sealing: Don't start batch timers with empty batches

This commit is contained in:
Łukasz Magiera 2021-05-25 16:47:42 +02:00
parent 04658e1cae
commit 7bd0fcbb24
2 changed files with 18 additions and 10 deletions

View File

@ -105,7 +105,7 @@ func (b *CommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-time.After(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)): case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
sendAboveMin = true sendAboveMin = true
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -119,12 +119,16 @@ func (b *CommitBatcher) run() {
} }
} }
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
}
var deadline time.Time var deadline time.Time
for sn := range b.todo { for sn := range b.todo {
sectorDeadline := b.deadlines[sn] sectorDeadline := b.deadlines[sn]
@ -140,12 +144,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
} }
if deadline.IsZero() { if deadline.IsZero() {
return maxWait return time.After(maxWait)
} }
deadline = deadline.Add(-slack) deadline = deadline.Add(-slack)
if deadline.Before(now) { if deadline.Before(now) {
return time.Nanosecond // can't return 0 return time.After(time.Nanosecond) // can't return 0
} }
wait := deadline.Sub(now) wait := deadline.Sub(now)
@ -153,7 +157,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
wait = maxWait wait = maxWait
} }
return wait return time.After(wait)
} }
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {

View File

@ -95,7 +95,7 @@ func (b *PreCommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-time.After(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)): case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
sendAboveMin = true sendAboveMin = true
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -109,12 +109,16 @@ func (b *PreCommitBatcher) run() {
} }
} }
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
}
var deadline time.Time var deadline time.Time
for sn := range b.todo { for sn := range b.todo {
sectorDeadline := b.deadlines[sn] sectorDeadline := b.deadlines[sn]
@ -130,12 +134,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration
} }
if deadline.IsZero() { if deadline.IsZero() {
return maxWait return time.After(maxWait)
} }
deadline = deadline.Add(-slack) deadline = deadline.Add(-slack)
if deadline.Before(now) { if deadline.Before(now) {
return time.Nanosecond // can't return 0 return time.After(time.Nanosecond) // can't return 0
} }
wait := deadline.Sub(now) wait := deadline.Sub(now)
@ -143,7 +147,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration
wait = maxWait wait = maxWait
} }
return wait return time.After(wait)
} }
func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {