Merge pull request #6636 from johnli-helloworld/fix/batchwait

Reuse timers in sealing batch logic
This commit is contained in:
Łukasz Magiera 2021-06-30 14:42:51 +02:00 committed by GitHub
commit 19a536d869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 12 deletions

View File

@ -106,6 +106,7 @@ func (b *CommitBatcher) run() {
panic(err)
}
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for {
if forceRes != nil {
forceRes <- lastMsg
@ -121,7 +122,7 @@ func (b *CommitBatcher) run() {
return
case <-b.notify:
sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
case <-timer.C:
// do nothing
case fr := <-b.force: // user triggered
forceRes = fr
@ -132,17 +133,26 @@ func (b *CommitBatcher) run() {
if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
}
}
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now()
b.lk.Lock()
defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
return maxWait
}
var cutoff time.Time
@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
}
if cutoff.IsZero() {
return time.After(maxWait)
return maxWait
}
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
return time.Nanosecond // can't return 0
}
wait := cutoff.Sub(now)
@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
wait = maxWait
}
return time.After(wait)
return wait
}
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {

View File

@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
panic(err)
}
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for {
if forceRes != nil {
forceRes <- lastRes
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
return
case <-b.notify:
sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
case <-timer.C:
// do nothing
case fr := <-b.force: // user triggered
forceRes = fr
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
}
}
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now()
b.lk.Lock()
defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
return maxWait
}
var cutoff time.Time
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
}
if cutoff.IsZero() {
return time.After(maxWait)
return maxWait
}
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
return time.Nanosecond // can't return 0
}
wait := cutoff.Sub(now)
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
wait = maxWait
}
return time.After(wait)
return wait
}
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {