Merge pull request #6563 from filecoin-project/asr/batch-starter

Always flush when timer goes off
This commit is contained in:
Łukasz Magiera 2021-06-23 19:19:30 +02:00 committed by GitHub
commit 0e4cd3c599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 9 additions and 21 deletions

View File

@ -108,7 +108,8 @@ func (b *CommitBatcher) run() {
} }
lastMsg = nil lastMsg = nil
var sendAboveMax, sendAboveMin bool // indicates whether we should only start a batch if we have reached or exceeded cfg.MaxCommitBatch
var sendAboveMax bool
select { select {
case <-b.stop: case <-b.stop:
close(b.stopped) close(b.stopped)
@ -116,13 +117,13 @@ func (b *CommitBatcher) run() {
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
sendAboveMin = true // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
} }
var err error var err error
lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) lastMsg, err = b.maybeStartBatch(sendAboveMax)
if err != nil { if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
@ -170,7 +171,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return time.After(wait) return time.After(wait)
} }
func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) { func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
@ -188,10 +189,6 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa
return nil, nil return nil, nil
} }
if after && total < cfg.MinCommitBatch {
return nil, nil
}
var res []sealiface.CommitBatchRes var res []sealiface.CommitBatchRes
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {

View File

@ -93,7 +93,7 @@ func (b *PreCommitBatcher) run() {
} }
lastRes = nil lastRes = nil
var sendAboveMax, sendAboveMin bool var sendAboveMax bool
select { select {
case <-b.stop: case <-b.stop:
close(b.stopped) close(b.stopped)
@ -101,13 +101,13 @@ func (b *PreCommitBatcher) run() {
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
sendAboveMin = true // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
} }
var err error var err error
lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) lastRes, err = b.maybeStartBatch(sendAboveMax)
if err != nil { if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err) log.Warnw("PreCommitBatcher processBatch error", "error", err)
} }
@ -155,7 +155,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return time.After(wait) return time.After(wait)
} }
func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) { func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
@ -173,10 +173,6 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo
return nil, nil return nil, nil
} }
if after && total < cfg.MinPreCommitBatch {
return nil, nil
}
// todo support multiple batches // todo support multiple batches
res, err := b.processBatch(cfg) res, err := b.processBatch(cfg)
if err != nil && len(res) == 0 { if err != nil && len(res) == 0 {

View File

@ -22,7 +22,6 @@ type Config struct {
BatchPreCommits bool BatchPreCommits bool
MaxPreCommitBatch int MaxPreCommitBatch int
MinPreCommitBatch int
PreCommitBatchWait time.Duration PreCommitBatchWait time.Duration
PreCommitBatchSlack time.Duration PreCommitBatchSlack time.Duration

View File

@ -93,7 +93,6 @@ type SealingConfig struct {
BatchPreCommits bool BatchPreCommits bool
// maximum precommit batch size - batches will be sent immediately above this size // maximum precommit batch size - batches will be sent immediately above this size
MaxPreCommitBatch int MaxPreCommitBatch int
MinPreCommitBatch int
// how long to wait before submitting a batch after crossing the minimum batch size // how long to wait before submitting a batch after crossing the minimum batch size
PreCommitBatchWait Duration PreCommitBatchWait Duration
// time buffer for forceful batch submission before sectors/deal in batch would start expiring // time buffer for forceful batch submission before sectors/deal in batch would start expiring
@ -285,7 +284,6 @@ func DefaultStorageMiner() *StorageMiner {
FinalizeEarly: false, FinalizeEarly: false,
BatchPreCommits: true, BatchPreCommits: true,
MinPreCommitBatch: 1, // we must have at least one precommit to batch
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, // up to 256 sectors MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, // up to 256 sectors
PreCommitBatchWait: Duration(24 * time.Hour), // this should be less than 31.5 hours, which is the expiration of a precommit ticket PreCommitBatchWait: Duration(24 * time.Hour), // this should be less than 31.5 hours, which is the expiration of a precommit ticket
PreCommitBatchSlack: Duration(3 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration PreCommitBatchSlack: Duration(3 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration

View File

@ -830,7 +830,6 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
FinalizeEarly: cfg.FinalizeEarly, FinalizeEarly: cfg.FinalizeEarly,
BatchPreCommits: cfg.BatchPreCommits, BatchPreCommits: cfg.BatchPreCommits,
MinPreCommitBatch: cfg.MinPreCommitBatch,
MaxPreCommitBatch: cfg.MaxPreCommitBatch, MaxPreCommitBatch: cfg.MaxPreCommitBatch,
PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait), PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait),
PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack), PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack),
@ -862,7 +861,6 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
FinalizeEarly: cfg.Sealing.FinalizeEarly, FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits, BatchPreCommits: cfg.Sealing.BatchPreCommits,
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),