Rename deadlines to cutoffs in the batchers

This commit is contained in:
Aayush Rajasekaran 2021-06-07 19:42:14 -04:00
parent 4a321c6da2
commit b13169f071
2 changed files with 44 additions and 44 deletions

View File

@ -51,9 +51,9 @@ type CommitBatcher struct {
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
prover ffiwrapper.Prover prover ffiwrapper.Prover
deadlines map[abi.SectorNumber]time.Time cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
notify, stop, stopped chan struct{} notify, stop, stopped chan struct{}
force chan chan []sealiface.CommitBatchRes force chan chan []sealiface.CommitBatchRes
@ -70,9 +70,9 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig, getConfig: getConfig,
prover: prov, prover: prov,
deadlines: map[abi.SectorNumber]time.Time{}, cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{}, todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{}, waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
notify: make(chan struct{}, 1), notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.CommitBatchRes), force: make(chan chan []sealiface.CommitBatchRes),
@ -132,30 +132,30 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return nil return nil
} }
var deadline time.Time var cutoff time.Time
for sn := range b.todo { for sn := range b.todo {
sectorDeadline := b.deadlines[sn] sectorCutoff := b.cutoffs[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
deadline = sectorDeadline cutoff = sectorCutoff
} }
} }
for sn := range b.waiting { for sn := range b.waiting {
sectorDeadline := b.deadlines[sn] sectorCutoff := b.cutoffs[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
deadline = sectorDeadline cutoff = sectorCutoff
} }
} }
if deadline.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return time.After(maxWait)
} }
deadline = deadline.Add(-slack) cutoff = cutoff.Add(-slack)
if deadline.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.After(time.Nanosecond) // can't return 0
} }
wait := deadline.Sub(now) wait := cutoff.Sub(now)
if wait > maxWait { if wait > maxWait {
wait = maxWait wait = maxWait
} }
@ -208,7 +208,7 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa
delete(b.waiting, sn) delete(b.waiting, sn)
delete(b.todo, sn) delete(b.todo, sn)
delete(b.deadlines, sn) delete(b.cutoffs, sn)
} }
} }
@ -378,7 +378,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
sn := s.SectorNumber sn := s.SectorNumber
b.lk.Lock() b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.cutoffs[sn] = getSectorCutoff(curEpoch, s)
b.todo[sn] = in b.todo[sn] = in
sent := make(chan sealiface.CommitBatchRes, 1) sent := make(chan sealiface.CommitBatchRes, 1)
@ -452,24 +452,24 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
} }
} }
func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time { func getSectorCutoff(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces { for _, p := range si.Pieces {
if p.DealInfo == nil { if p.DealInfo == nil {
continue continue
} }
startEpoch := p.DealInfo.DealSchedule.StartEpoch startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch { if startEpoch < cutoffEpoch {
deadlineEpoch = startEpoch cutoffEpoch = startEpoch
} }
} }
if deadlineEpoch <= curEpoch { if cutoffEpoch <= curEpoch {
return time.Now() return time.Now()
} }
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second) return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
} }
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) { func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {

View File

@ -40,9 +40,9 @@ type PreCommitBatcher struct {
feeCfg FeeConfig feeCfg FeeConfig
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
deadlines map[abi.SectorNumber]time.Time cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
notify, stop, stopped chan struct{} notify, stop, stopped chan struct{}
force chan chan []sealiface.PreCommitBatchRes force chan chan []sealiface.PreCommitBatchRes
@ -58,9 +58,9 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
feeCfg: feeCfg, feeCfg: feeCfg,
getConfig: getConfig, getConfig: getConfig,
deadlines: map[abi.SectorNumber]time.Time{}, cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{}, todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{}, waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
notify: make(chan struct{}, 1), notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.PreCommitBatchRes), force: make(chan chan []sealiface.PreCommitBatchRes),
@ -120,30 +120,30 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return nil return nil
} }
var deadline time.Time var cutoff time.Time
for sn := range b.todo { for sn := range b.todo {
sectorDeadline := b.deadlines[sn] sectorCutoff := b.cutoffs[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
deadline = sectorDeadline cutoff = sectorCutoff
} }
} }
for sn := range b.waiting { for sn := range b.waiting {
sectorDeadline := b.deadlines[sn] sectorCutoff := b.cutoffs[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
deadline = sectorDeadline cutoff = sectorCutoff
} }
} }
if deadline.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return time.After(maxWait)
} }
deadline = deadline.Add(-slack) cutoff = cutoff.Add(-slack)
if deadline.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.After(time.Nanosecond) // can't return 0
} }
wait := deadline.Sub(now) wait := cutoff.Sub(now)
if wait > maxWait { if wait > maxWait {
wait = maxWait wait = maxWait
} }
@ -191,7 +191,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo
delete(b.waiting, sn) delete(b.waiting, sn)
delete(b.todo, sn) delete(b.todo, sn)
delete(b.deadlines, sn) delete(b.cutoffs, sn)
} }
} }
@ -254,7 +254,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
sn := s.SectorNumber sn := s.SectorNumber
b.lk.Lock() b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.cutoffs[sn] = getSectorCutoff(curEpoch, s)
b.todo[sn] = &preCommitEntry{ b.todo[sn] = &preCommitEntry{
deposit: deposit, deposit: deposit,
pci: in, pci: in,