diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index c18c89db5..57379414f 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -104,6 +104,7 @@ func (b *CommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { forceRes <- lastMsg @@ -119,7 +120,7 @@ func (b *CommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -130,17 +131,26 @@ func (b *CommitBatcher) run() { if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) } } -func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -158,12 +168,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -171,7 +181,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time wait = maxWait } - return time.After(wait) + return wait } func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index d1d2f5878..f576608ea 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -86,6 +86,7 @@ func (b *PreCommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { forceRes <- lastRes @@ -100,7 +101,7 @@ func (b *PreCommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -111,17 +112,26 @@ func (b *PreCommitBatcher) run() { if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) } } -func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -139,12 +149,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -152,7 +162,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T wait = maxWait } - return time.After(wait) + return wait } func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {