From d92c5e100173a1aa74485357aea892b5dfbe6058 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 May 2021 16:54:55 +0200 Subject: [PATCH] Missing precommit batcher --- extern/storage-sealing/precommit_batch.go | 298 ++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 extern/storage-sealing/precommit_batch.go diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go new file mode 100644 index 000000000..93846fbef --- /dev/null +++ b/extern/storage-sealing/precommit_batch.go @@ -0,0 +1,298 @@ +package sealing + +import ( + "bytes" + "context" + "sort" + "sync" + "time" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" +) + +var ( + // TODO: config + + PreCommitBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k + PreCommitBatchMin uint64 = 1 + PreCommitBatchWait = 5 * time.Minute +) + +type PreCommitBatcherApi interface { + SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) + StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) + ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) +} + +type PreCommitBatcher struct { + api PreCommitBatcherApi + maddr address.Address + mctx context.Context + addrSel AddrSel + feeCfg FeeConfig + getConfig GetSealingConfigFunc + + deadlines map[abi.SectorNumber]time.Time + todo map[abi.SectorNumber]*miner0.SectorPreCommitInfo + waiting map[abi.SectorNumber][]chan cid.Cid + + notify, stop, stopped chan struct{} + force chan chan *cid.Cid + lk sync.Mutex +} + +func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { + b := &PreCommitBatcher{ + api: api, + maddr: maddr, + mctx: mctx, + addrSel: addrSel, + feeCfg: feeCfg, + getConfig: getConfig, + + deadlines: map[abi.SectorNumber]time.Time{}, + todo: map[abi.SectorNumber]*miner0.SectorPreCommitInfo{}, + waiting: map[abi.SectorNumber][]chan cid.Cid{}, + + notify: make(chan struct{}, 1), + force: make(chan chan *cid.Cid), + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + + go b.run() + + return b +} + +func (b *PreCommitBatcher) run() { + var forceRes chan *cid.Cid + var lastMsg *cid.Cid + + cfg, err := b.getConfig() + if err != nil { + panic(err) + } + + for { + if forceRes != nil { + forceRes <- lastMsg + forceRes = nil + } + lastMsg = nil + + var sendAboveMax, sendAboveMin bool + select { + case <-b.stop: + close(b.stopped) + return + case <-b.notify: + sendAboveMax = true + case <-time.After(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)): + sendAboveMin = true + case fr := <-b.force: // user triggered + forceRes = fr + } + + var err error + lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + if err != nil { + log.Warnw("TerminateBatcher processBatch error", "error", err) + } + } +} + +func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { + now := time.Now() + + b.lk.Lock() + defer b.lk.Unlock() + + var deadline time.Time + for sn := range b.todo { + sectorDeadline := b.deadlines[sn] + if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { + deadline = sectorDeadline + } + } + for sn := range b.waiting { + sectorDeadline := b.deadlines[sn] + if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) { + deadline = sectorDeadline + } + } + + if deadline.IsZero() { + return maxWait + } + + deadline = deadline.Add(-slack) + if deadline.Before(now) { + return time.Nanosecond // can't return 0 + } + + wait := deadline.Sub(now) + if wait > maxWait { + wait = maxWait + } + + return wait +} + +func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { + b.lk.Lock() + defer b.lk.Unlock() + params := miner5.PreCommitSectorBatchParams{} + + total := len(b.todo) + if total == 0 { + return nil, nil // nothing to do + } + + cfg, err := b.getConfig() + if err != nil { + return nil, xerrors.Errorf("getting config: %w", err) + } + + if notif && total < cfg.MaxPreCommitBatch { + return nil, nil + } + + if after && total < cfg.MinPreCommitBatch { + return nil, nil + } + + for _, p := range b.todo { + params.Sectors = append(params.Sectors, p) + } + + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) + } + + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("couldn't get miner info: %w", err) + } + + from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, b.feeCfg.MaxPreCommitGasFee, b.feeCfg.MaxPreCommitGasFee) + if err != nil { + return nil, xerrors.Errorf("no good address found: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, big.Zero(), b.feeCfg.MaxPreCommitGasFee, enc.Bytes()) + if err != nil { + return nil, xerrors.Errorf("sending message failed: %w", err) + } + + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total) + + for _, sector := range params.Sectors { + sn := sector.SectorNumber + + for _, ch := range b.waiting[sn] { + ch <- mcid // buffered + } + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + + return &mcid, nil +} + +// register PreCommit, wait for batch message, return message CID +func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) { + _, curEpoch, err := b.api.ChainHead(b.mctx) + if err != nil { + log.Errorf("getting chain head: %s", err) + return cid.Undef, nil + } + + sn := s.SectorNumber + + b.lk.Lock() + b.deadlines[sn] = getSectorDeadline(curEpoch, s) + b.todo[sn] = in + + sent := make(chan cid.Cid, 1) + b.waiting[sn] = append(b.waiting[sn], sent) + + select { + case b.notify <- struct{}{}: + default: // already have a pending notification, don't need more + } + b.lk.Unlock() + + select { + case c := <-sent: + return c, nil + case <-ctx.Done(): + return cid.Undef, ctx.Err() + } +} + +func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { + resCh := make(chan *cid.Cid, 1) + select { + case b.force <- resCh: + select { + case res := <-resCh: + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (b *PreCommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) { + b.lk.Lock() + defer b.lk.Unlock() + + mid, err := address.IDFromAddress(b.maddr) + if err != nil { + return nil, err + } + + res := make([]abi.SectorID, 0) + for _, s := range b.todo { + res = append(res, abi.SectorID{ + Miner: abi.ActorID(mid), + Number: s.SectorNumber, + }) + } + + sort.Slice(res, func(i, j int) bool { + if res[i].Miner != res[j].Miner { + return res[i].Miner < res[j].Miner + } + + return res[i].Number < res[j].Number + }) + + return res, nil +} + +func (b *PreCommitBatcher) Stop(ctx context.Context) error { + close(b.stop) + + select { + case <-b.stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}