to optimize the batchwait
This commit is contained in:
parent
d5b7e81b09
commit
dbdbb6f0dd
22
extern/storage-sealing/commit_batch.go
vendored
22
extern/storage-sealing/commit_batch.go
vendored
@ -106,6 +106,7 @@ func (b *CommitBatcher) run() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastMsg
|
forceRes <- lastMsg
|
||||||
@ -121,7 +122,7 @@ func (b *CommitBatcher) run() {
|
|||||||
return
|
return
|
||||||
case <-b.notify:
|
case <-b.notify:
|
||||||
sendAboveMax = true
|
sendAboveMax = true
|
||||||
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
|
case <-timer.C:
|
||||||
// do nothing
|
// do nothing
|
||||||
case fr := <-b.force: // user triggered
|
case fr := <-b.force: // user triggered
|
||||||
forceRes = fr
|
forceRes = fr
|
||||||
@ -132,17 +133,26 @@ func (b *CommitBatcher) run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("CommitBatcher processBatch error", "error", err)
|
log.Warnw("CommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
|
|
||||||
if len(b.todo) == 0 {
|
if len(b.todo) == 0 {
|
||||||
return nil
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
var cutoff time.Time
|
var cutoff time.Time
|
||||||
@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cutoff.IsZero() {
|
if cutoff.IsZero() {
|
||||||
return time.After(maxWait)
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
cutoff = cutoff.Add(-slack)
|
cutoff = cutoff.Add(-slack)
|
||||||
if cutoff.Before(now) {
|
if cutoff.Before(now) {
|
||||||
return time.After(time.Nanosecond) // can't return 0
|
return time.Nanosecond // can't return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := cutoff.Sub(now)
|
wait := cutoff.Sub(now)
|
||||||
@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
|||||||
wait = maxWait
|
wait = maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
return time.After(wait)
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
|
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
|
||||||
|
22
extern/storage-sealing/precommit_batch.go
vendored
22
extern/storage-sealing/precommit_batch.go
vendored
@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastRes
|
forceRes <- lastRes
|
||||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
|||||||
return
|
return
|
||||||
case <-b.notify:
|
case <-b.notify:
|
||||||
sendAboveMax = true
|
sendAboveMax = true
|
||||||
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
|
case <-timer.C:
|
||||||
// do nothing
|
// do nothing
|
||||||
case fr := <-b.force: // user triggered
|
case fr := <-b.force: // user triggered
|
||||||
forceRes = fr
|
forceRes = fr
|
||||||
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
|
|
||||||
if len(b.todo) == 0 {
|
if len(b.todo) == 0 {
|
||||||
return nil
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
var cutoff time.Time
|
var cutoff time.Time
|
||||||
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cutoff.IsZero() {
|
if cutoff.IsZero() {
|
||||||
return time.After(maxWait)
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
cutoff = cutoff.Add(-slack)
|
cutoff = cutoff.Add(-slack)
|
||||||
if cutoff.Before(now) {
|
if cutoff.Before(now) {
|
||||||
return time.After(time.Nanosecond) // can't return 0
|
return time.Nanosecond // can't return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := cutoff.Sub(now)
|
wait := cutoff.Sub(now)
|
||||||
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
|||||||
wait = maxWait
|
wait = maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
return time.After(wait)
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
|
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user