Merge pull request #6416 from filecoin-project/asr/sector-cutoffs

Commit and Precommit batcher cannot share a getSectorDeadline method
This commit is contained in:
Łukasz Magiera 2021-06-09 19:27:06 +02:00 committed by GitHub
commit f75922fd3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 52 deletions

View File

@ -41,7 +41,7 @@ const UpgradeOrangeHeight = 300
const UpgradeTrustHeight = 600
const UpgradeNorwegianHeight = 201000
const UpgradeActorsV4Height = 203000
const UpgradeTurboHeight = 203000
const UpgradeHyperdriveHeight = 999999999
func init() {

View File

@ -7,6 +7,10 @@ import (
"sync"
"time"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -34,6 +38,7 @@ type CommitBatcherApi interface {
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
}
type AggregateInput struct {
@ -51,9 +56,9 @@ type CommitBatcher struct {
getConfig GetSealingConfigFunc
prover ffiwrapper.Prover
deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
notify, stop, stopped chan struct{}
force chan chan []sealiface.CommitBatchRes
@ -70,9 +75,9 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig,
prover: prov,
deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.CommitBatchRes),
@ -132,30 +137,30 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return nil
}
var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}
deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}
wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
@ -208,7 +213,7 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}
@ -369,16 +374,15 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return sealiface.CommitBatchRes{}, nil
}
sn := s.SectorNumber
cu, err := b.getCommitCutoff(s)
if err != nil {
return sealiface.CommitBatchRes{}, err
}
b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = cu
b.todo[sn] = in
sent := make(chan sealiface.CommitBatchRes, 1)
@ -452,24 +456,43 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
}
}
func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
// TODO: If this returned epochs, it would make testing much easier
func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
return time.Now(), xerrors.Errorf("getting chain head: %s", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return time.Now(), xerrors.Errorf("getting network version: %s", err)
}
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok)
if err != nil {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
}
cutoffEpoch := pci.PreCommitEpoch + policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), si.SectorType)
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}
if deadlineEpoch <= curEpoch {
return time.Now()
if cutoffEpoch <= curEpoch {
return time.Now(), nil
}
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
}
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {

View File

@ -7,6 +7,9 @@ import (
"sync"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -40,9 +43,9 @@ type PreCommitBatcher struct {
feeCfg FeeConfig
getConfig GetSealingConfigFunc
deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
notify, stop, stopped chan struct{}
force chan chan []sealiface.PreCommitBatchRes
@ -58,9 +61,9 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
feeCfg: feeCfg,
getConfig: getConfig,
deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.PreCommitBatchRes),
@ -120,30 +123,30 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return nil
}
var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}
deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}
wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
@ -191,7 +194,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}
@ -254,7 +257,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
sn := s.SectorNumber
b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = getPreCommitCutoff(curEpoch, s)
b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
@ -330,3 +333,24 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error {
return ctx.Err()
}
}
// TODO: If this returned epochs, it would make testing much easier
func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}
if cutoffEpoch <= curEpoch {
return time.Now()
}
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
}