Import precommit batcher

This commit is contained in:
Łukasz Magiera 2021-05-18 16:51:06 +02:00
parent 56145201db
commit c7ba083fa4
11 changed files with 140 additions and 52 deletions

View File

@ -80,6 +80,11 @@ type StorageMiner interface {
// SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message // SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) //perm:admin SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
// SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
// Returns null if message wasn't sent
SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
// SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin

View File

@ -649,6 +649,10 @@ type StorageMinerStruct struct {
SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorRemove func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` SectorRemove func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
SectorSetExpectedSealDuration func(p0 context.Context, p1 time.Duration) error `perm:"write"` SectorSetExpectedSealDuration func(p0 context.Context, p1 time.Duration) error `perm:"write"`
@ -1947,6 +1951,14 @@ func (s *StorageMinerStruct) SectorMarkForUpgrade(p0 context.Context, p1 abi.Sec
return s.Internal.SectorMarkForUpgrade(p0, p1) return s.Internal.SectorMarkForUpgrade(p0, p1)
} }
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) {
return s.Internal.SectorPreCommitFlush(p0)
}
func (s *StorageMinerStruct) SectorPreCommitPending(p0 context.Context) ([]abi.SectorID, error) {
return s.Internal.SectorPreCommitPending(p0)
}
func (s *StorageMinerStruct) SectorRemove(p0 context.Context, p1 abi.SectorNumber) error { func (s *StorageMinerStruct) SectorRemove(p0 context.Context, p1 abi.SectorNumber) error {
return s.Internal.SectorRemove(p0, p1) return s.Internal.SectorRemove(p0, p1)
} }

View File

@ -155,7 +155,7 @@ func (*ActorRegistry) transform(instance invokee) (nativeCode, error) {
"vmr.Runtime, <parameter>") "vmr.Runtime, <parameter>")
} }
if !runtimeType.Implements(t.In(0)) { if !runtimeType.Implements(t.In(0)) {
return nil, newErr("first arguemnt should be vmr.Runtime") return nil, newErr("first argument should be vmr.Runtime")
} }
if t.In(1).Kind() != reflect.Ptr { if t.In(1).Kind() != reflect.Ptr {
return nil, newErr("second argument should be of kind reflect.Ptr") return nil, newErr("second argument should be of kind reflect.Ptr")

View File

@ -155,32 +155,6 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
return wait return wait
} }
func (b *CommitBatcher) getSectorDeadline(si SectorInfo) time.Time {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return time.Time{}
}
deadlineEpoch := si.TicketEpoch
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
}
}
if deadlineEpoch <= curEpoch {
return time.Now()
}
return time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second
}
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
@ -208,20 +182,27 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
spt := b.todo[0].spt spt := b.todo[0].spt
proofs := make([][]byte, 0, total) proofs := make([][]byte, 0, total)
infos := make([]proof5.AggregateSealVerifyInfo, 0, total)
for id, p := range b.todo { for id, p := range b.todo {
params.SectorNumbers.Set(uint64(id)) params.SectorNumbers.Set(uint64(id))
proofs = append(proofs, p.proof) proofs = append(proofs, p.proof)
infos = append(infos, p.info)
} }
params.AggregateProof, err = b.verif.AggregateSealProofs(spt, arp, proofs) params.AggregateProof, err = b.verif.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
Miner: 0,
SealProof: spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
if err != nil { if err != nil {
return nil, xerrors.Errorf("aggregating proofs: %w", err) return nil, xerrors.Errorf("aggregating proofs: %w", err)
} }
enc := new(bytes.Buffer) enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil { if err := params.MarshalCBOR(enc); err != nil {
return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err) return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
} }
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
@ -261,9 +242,16 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
// register commit, wait for batch message, return message CID // register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) { func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return cid.Undef, nil
}
sn := s.SectorNumber sn := s.SectorNumber
b.lk.Lock() b.lk.Lock()
b.deadlines[sn] = b.getSectorDeadline(s) b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.todo[sn] = in b.todo[sn] = in
sent := make(chan cid.Cid, 1) sent := make(chan cid.Cid, 1)
@ -336,3 +324,23 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
return ctx.Err() return ctx.Err()
} }
} }
func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
}
}
if deadlineEpoch <= curEpoch {
return time.Now()
}
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
}

View File

@ -18,6 +18,12 @@ type Config struct {
AlwaysKeepUnsealedCopy bool AlwaysKeepUnsealedCopy bool
BatchPreCommits bool
MaxPreCommitBatch int
MinPreCommitBatch int
PreCommitBatchWait time.Duration
PreCommitBatchSlack time.Duration
AggregateCommits bool AggregateCommits bool
MinCommitBatch int MinCommitBatch int
MaxCommitBatch int MaxCommitBatch int

View File

@ -103,6 +103,7 @@ type Sealing struct {
stats SectorStats stats SectorStats
terminator *TerminateBatcher terminator *TerminateBatcher
precommiter *PreCommitBatcher
commiter *CommitBatcher commiter *CommitBatcher
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
@ -153,6 +154,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
addrSel: as, addrSel: as,
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc, gc), terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc, gc),
precommiter: NewPreCommitBatcher(context.TODO(), maddr, api, as, fc, gc),
commiter: NewCommitBatcher(context.TODO(), maddr, api, as, fc, gc, verif), commiter: NewCommitBatcher(context.TODO(), maddr, api, as, fc, gc, verif),
getConfig: gc, getConfig: gc,
@ -204,6 +206,14 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx) return m.terminator.Pending(ctx)
} }
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.precommiter.Flush(ctx)
}
func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.precommiter.Pending(ctx)
}
func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) { func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.commiter.Flush(ctx) return m.commiter.Flush(ctx)
} }

View File

@ -80,6 +80,11 @@ func (b *TerminateBatcher) run() {
} }
lastMsg = nil lastMsg = nil
cfg, err := b.getConfig()
if err != nil {
log.Warnw("TerminateBatcher getconfig error", "error", err)
}
var sendAboveMax, sendAboveMin bool var sendAboveMax, sendAboveMin bool
select { select {
case <-b.stop: case <-b.stop:
@ -87,13 +92,12 @@ func (b *TerminateBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-time.After(TerminateBatchWait): case <-time.After(cfg.TerminateBatchWait):
sendAboveMin = true sendAboveMin = true
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
} }
var err error
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
if err != nil { if err != nil {
log.Warnw("TerminateBatcher processBatch error", "error", err) log.Warnw("TerminateBatcher processBatch error", "error", err)

View File

@ -82,9 +82,21 @@ type SealingConfig struct {
AlwaysKeepUnsealedCopy bool AlwaysKeepUnsealedCopy bool
BatchPreCommits bool
MaxPreCommitBatch int
MinPreCommitBatch int
PreCommitBatchWait Duration
PreCommitBatchSlack Duration
AggregateCommits bool AggregateCommits bool
MinCommitBatch int MinCommitBatch int
MaxCommitBatch int MaxCommitBatch int
CommitBatchWait Duration
CommitBatchSlack Duration
TerminateBatchMax uint64
TerminateBatchMin uint64
TerminateBatchWait Duration
// Keep this many sectors in sealing pipeline, start CC if needed // Keep this many sectors in sealing pipeline, start CC if needed
// todo TargetSealingSectors uint64 // todo TargetSealingSectors uint64
@ -242,14 +254,21 @@ func DefaultStorageMiner() *StorageMiner {
WaitDealsDelay: Duration(time.Hour * 6), WaitDealsDelay: Duration(time.Hour * 6),
AlwaysKeepUnsealedCopy: true, AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MinPreCommitBatch: 1, // we must have at least one proof to aggregate
MaxPreCommitBatch: 204, // todo max?
PreCommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
PreCommitBatchSlack: Duration(8 * time.Hour),
AggregateCommits: true, AggregateCommits: true,
MinCommitBatch: 1, // we must have at least one proof to aggregate MinCommitBatch: 1, // we must have at least one proof to aggregate
MaxCommitBatch: 204, // this is the maximum aggregation per FIP13 MaxCommitBatch: 204, // this is the maximum aggregation per FIP13
CommitBatchWait: time.Day, // this can be up to 6 days CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
CommitBatchSlack: 8 * time.Hour, CommitBatchSlack: Duration(8 * time.Hour),
TerminateBatchMin: 1, TerminateBatchMin: 1,
TerminateBatchMax: 100, TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute, TerminateBatchWait: Duration(5 * time.Minute),
}, },
Storage: sectorstorage.SealerConfig{ Storage: sectorstorage.SealerConfig{

View File

@ -374,6 +374,14 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se
return sm.Miner.TerminatePending(ctx) return sm.Miner.TerminatePending(ctx)
} }
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
return sm.Miner.SectorPreCommitFlush(ctx)
}
func (sm *StorageMinerAPI) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.SectorPreCommitPending(ctx)
}
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.MarkForUpgrade(id) return sm.Miner.MarkForUpgrade(id)
} }

View File

@ -847,14 +847,22 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits, AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch, MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch, MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: cfg.Sealing.CommitBatchWait, CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: cfg.Sealing.CommitBatchSlack, CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax, TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin, TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: cfg.Sealing.TerminateBatchWait, TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
} }
}) })
return return

View File

@ -59,6 +59,14 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.TerminatePending(ctx) return m.sealing.TerminatePending(ctx)
} }
func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.SectorPreCommitFlush(ctx)
}
func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.SectorPreCommitPending(ctx)
}
func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) { func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.CommitFlush(ctx) return m.sealing.CommitFlush(ctx)
} }