to optimize the batchwait

This commit is contained in:
johnli-helloworld 2021-06-30 16:56:40 +08:00 committed by Łukasz Magiera
parent db30d891c8
commit 4fe3ecb8fc
2 changed files with 32 additions and 12 deletions

View File

@ -104,6 +104,7 @@ func (b *CommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastMsg forceRes <- lastMsg
@ -119,7 +120,7 @@ func (b *CommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -130,17 +131,26 @@ func (b *CommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
} }
} }
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
}
}
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -158,12 +168,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -171,7 +181,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {

View File

@ -86,6 +86,7 @@ func (b *PreCommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastRes forceRes <- lastRes
@ -100,7 +101,7 @@ func (b *PreCommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -111,17 +112,26 @@ func (b *PreCommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err) log.Warnw("PreCommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
} }
} }
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
}
}
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -139,12 +149,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -152,7 +162,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {