From 6ca2a148199b47d9f0eb0eb40fb4bc2a712967bd Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Tue, 22 Jun 2021 19:16:36 -0400 Subject: [PATCH 1/3] Always flush when timer goes off --- extern/storage-sealing/commit_batch.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 61553601a..929d73306 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -108,7 +108,7 @@ func (b *CommitBatcher) run() { } lastMsg = nil - var sendAboveMax, sendAboveMin bool + var sendAboveMax bool select { case <-b.stop: close(b.stopped) @@ -116,13 +116,13 @@ func (b *CommitBatcher) run() { case <-b.notify: sendAboveMax = true case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): - sendAboveMin = true + // do nothing case fr := <-b.force: // user triggered forceRes = fr } var err error - lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) + lastMsg, err = b.maybeStartBatch(sendAboveMax) if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } @@ -170,7 +170,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time return time.After(wait) } -func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) { +func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() @@ -188,10 +188,6 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa return nil, nil } - if after && total < cfg.MinCommitBatch { - return nil, nil - } - var res []sealiface.CommitBatchRes if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { From 93f7cbe58717f78baa75ad556e100177bf07ae0c Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Tue, 22 Jun 2021 19:30:33 -0400 Subject: [PATCH 2/3] Add a helpful comment --- extern/storage-sealing/commit_batch.go | 1 + 1 file changed, 1 insertion(+) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 929d73306..3c0af1176 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -108,6 +108,7 @@ func (b *CommitBatcher) run() { } lastMsg = nil + // indicates whether we should only start a batch if we have reached or exceeded cfg.MaxCommitBatch var sendAboveMax bool select { case <-b.stop: From 616e5688fc4a62f3747b1983785b2844e32ba0f2 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Wed, 23 Jun 2021 12:30:32 -0400 Subject: [PATCH 3/3] Remove MinPreCommitBatch --- extern/storage-sealing/precommit_batch.go | 12 ++++-------- extern/storage-sealing/sealiface/config.go | 1 - node/config/def.go | 2 -- node/modules/storageminer.go | 2 -- 4 files changed, 4 insertions(+), 13 deletions(-) diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index 9439ae14c..d1d2f5878 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -93,7 +93,7 @@ func (b *PreCommitBatcher) run() { } lastRes = nil - var sendAboveMax, sendAboveMin bool + var sendAboveMax bool select { case <-b.stop: close(b.stopped) @@ -101,13 +101,13 @@ func (b *PreCommitBatcher) run() { case <-b.notify: sendAboveMax = true case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): - sendAboveMin = true + // do nothing case fr := <-b.force: // user triggered forceRes = fr } var err error - lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) + lastRes, err = b.maybeStartBatch(sendAboveMax) if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } @@ -155,7 +155,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T return time.After(wait) } -func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) { +func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() @@ -173,10 +173,6 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo return nil, nil } - if after && total < cfg.MinPreCommitBatch { - return nil, nil - } - // todo support multiple batches res, err := b.processBatch(cfg) if err != nil && len(res) == 0 { diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 499a2befa..b237072d3 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -22,7 +22,6 @@ type Config struct { BatchPreCommits bool MaxPreCommitBatch int - MinPreCommitBatch int PreCommitBatchWait time.Duration PreCommitBatchSlack time.Duration diff --git a/node/config/def.go b/node/config/def.go index 177871cc5..d1beb843b 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -93,7 +93,6 @@ type SealingConfig struct { BatchPreCommits bool // maximum precommit batch size - batches will be sent immediately above this size MaxPreCommitBatch int - MinPreCommitBatch int // how long to wait before submitting a batch after crossing the minimum batch size PreCommitBatchWait Duration // time buffer for forceful batch submission before sectors/deal in batch would start expiring @@ -285,7 +284,6 @@ func DefaultStorageMiner() *StorageMiner { FinalizeEarly: false, BatchPreCommits: true, - MinPreCommitBatch: 1, // we must have at least one precommit to batch MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, // up to 256 sectors PreCommitBatchWait: Duration(24 * time.Hour), // this should be less than 31.5 hours, which is the expiration of a precommit ticket PreCommitBatchSlack: Duration(3 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index b7aae0f44..3a28d5c85 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -830,7 +830,6 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error FinalizeEarly: cfg.FinalizeEarly, BatchPreCommits: cfg.BatchPreCommits, - MinPreCommitBatch: cfg.MinPreCommitBatch, MaxPreCommitBatch: cfg.MaxPreCommitBatch, PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait), PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack), @@ -862,7 +861,6 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error FinalizeEarly: cfg.Sealing.FinalizeEarly, BatchPreCommits: cfg.Sealing.BatchPreCommits, - MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch, MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),