From 239d6f8f4dcc71ae2cc33e25dac7d1f6b3f704f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 18 Jan 2021 14:26:03 +0100 Subject: [PATCH] storagefsm: Rewrite input handling --- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/fsm.go | 83 ++---- extern/storage-sealing/fsm_events.go | 13 +- extern/storage-sealing/input.go | 308 +++++++++++++++++++++ extern/storage-sealing/sealiface/config.go | 2 + extern/storage-sealing/sealing.go | 301 ++------------------ extern/storage-sealing/sector_state.go | 8 +- extern/storage-sealing/stats.go | 13 +- extern/storage-sealing/types.go | 3 +- node/config/def.go | 9 + node/modules/storageminer.go | 2 + 11 files changed, 389 insertions(+), 354 deletions(-) create mode 100644 extern/storage-sealing/input.go diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 30c2924f2..edefacf4d 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -284,6 +284,7 @@ var stateList = []stateMeta{ {col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.WaitDeals}, + {col: color.FgBlue, state: sealing.AddPiece}, {col: color.FgRed, state: sealing.UndefinedSectorState}, {col: color.FgYellow, state: sealing.Packing}, diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index c989d0296..0e3f6ba2b 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -37,14 +37,20 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto // Sealing UndefinedSectorState: planOne( - on(SectorStart{}, Empty), + on(SectorStart{}, WaitDeals), on(SectorStartCC{}, Packing), ), - Empty: planOne(on(SectorAddPiece{}, WaitDeals)), - WaitDeals: planOne( - on(SectorAddPiece{}, WaitDeals), + Empty: planOne( // deprecated + on(SectorAddPiece{}, AddPiece), on(SectorStartPacking{}, Packing), ), + WaitDeals: planOne( + on(SectorAddPiece{}, AddPiece), + on(SectorStartPacking{}, Packing), + ), + AddPiece: planOne( + on(SectorPieceAdded{}, WaitDeals), + ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( on(SectorTicket{}, PreCommit1), @@ -238,12 +244,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta /* - * Empty <- incoming deals - | | - | v - *<- WaitDeals <- incoming deals - | | - | v + UndefinedSectorState (start) + v | + *<- WaitDeals <-> AddPiece | + | | /--------------------/ + | v v *<- Packing <- incoming committed capacity | | | v @@ -282,10 +287,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta v FailedUnrecoverable - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ - */ m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) @@ -295,7 +296,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case Empty: fallthrough case WaitDeals: - log.Infof("Waiting for deals %d", state.SectorNumber) + return m.handleWaitDeals, processed, nil + case AddPiece: + return m.handleAddPiece, processed, nil case Packing: return m.handlePacking, processed, nil case GetTicket: @@ -418,60 +421,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error { log.Errorf("loading sector list: %+v", err) } - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting the sealing delay: %w", err) - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof: %w", err) - } - ssize, err := spt.SectorSize() - if err != nil { - return err - } - - // m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races - defer m.unsealedInfoMap.lk.Unlock() - for _, sector := range trackedSectors { if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil { log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err) } - - if sector.State == WaitDeals { - - // put the sector in the unsealedInfoMap - if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok { - // something's funky here, but probably safe to move on - log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber) - } else { - ui := UnsealedSectorInfo{ - ssize: ssize, - } - for _, p := range sector.Pieces { - if p.DealInfo != nil { - ui.numDeals++ - } - ui.stored += p.Piece.Size - ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded()) - } - - m.unsealedInfoMap.infos[sector.SectorNumber] = ui - } - - // start a fresh timer for the sector - if cfg.WaitDealsDelay > 0 { - timer := time.NewTimer(cfg.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sector.SectorNumber); err != nil { - log.Errorf("starting sector %d: %+v", sector.SectorNumber, err) - } - }() - } - } } // TODO: Grab on-chain sector set and diff with trackedSectors diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index e28366721..969ccf1e2 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -77,11 +77,20 @@ func (evt SectorStartCC) apply(state *SectorInfo) { } type SectorAddPiece struct { - NewPiece Piece + NewPiece cid.Cid } func (evt SectorAddPiece) apply(state *SectorInfo) { - state.Pieces = append(state.Pieces, evt.NewPiece) + state.PendingPieces = append(state.PendingPieces, evt.NewPiece) +} + +type SectorPieceAdded struct { + NewPieces []Piece +} + +func (evt SectorPieceAdded) apply(state *SectorInfo) { + state.Pieces = append(state.Pieces, evt.NewPieces...) + state.PendingPieces = nil } type SectorStartPacking struct{} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go new file mode 100644 index 000000000..b0d06b326 --- /dev/null +++ b/extern/storage-sealing/input.go @@ -0,0 +1,308 @@ +package sealing + +import ( + "context" + "sort" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-storage/storage" + + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" +) + +func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { + // if full / oldish / has oldish deals goto seal + // ^ also per sector deal limit + + // send SectorStartPacking + + m.inputLk.Lock() + var used abi.UnpaddedPieceSize + for _, piece := range sector.Pieces { + used += piece.Piece.Size.Unpadded() + } + + m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ + used: used, + maybeAccept: func(cid cid.Cid) error { + // todo check space + + // todo check deal expiration + + return ctx.Send(SectorAddPiece{cid}) + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil { + log.Errorf("%+v", err) + } + }() + + return nil +} + +func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return err + } + + m.inputLk.Lock() + delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) // todo: do this when handling the event + m.inputLk.Unlock() + + res := SectorPieceAdded{} + + for _, piece := range sector.PendingPieces { + m.inputLk.Lock() + deal, ok := m.pendingPieces[piece] + m.inputLk.Unlock() + if !ok { + // todo: this probably means that the miner process was restarted in the middle of adding pieces. + // Truncate whatever was in process of being added to the sector (keep sector.Pieces as those are cleanly added, then go to WaitDeals) + return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) + } + + var stored abi.PaddedPieceSize + for _, piece := range sector.Pieces { + stored += piece.Piece.Size + } + + pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded()) + + if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { + return xerrors.Errorf("piece assigned to a sector with not enough space") + } + + offset := padLength + pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) + for i, p := range sector.Pieces { + pieceSizes[i] = p.Piece.Size.Unpadded() + offset += p.Piece.Size + } + + for _, p := range pads { + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + p.Unpadded(), + NewNullReader(p.Unpadded())) + if err != nil { + return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + } + + pieceSizes = append(pieceSizes, p.Unpadded()) + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + }) + } + + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), + m.minerSector(sector.SectorType, sector.SectorNumber), + pieceSizes, + deal.size, + deal.data) + if err != nil { + return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + } + + pieceSizes = append(pieceSizes, deal.size) + res.NewPieces = append(res.NewPieces, Piece{ + Piece: ppi, + DealInfo: &deal.deal, + }) + } + + return ctx.Send(res) +} + +func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) + if (padreader.PaddedSize(uint64(size))) != size { + return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + } + + sp, err := m.currentSealProof(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + } + + ssize, err := sp.SectorSize() + if err != nil { + return 0, 0, err + } + + if size > abi.PaddedPieceSize(ssize).Unpadded() { + return 0, 0, xerrors.Errorf("piece cannot fit into a sector") + } + + if deal.PublishCid == nil { + return 0, 0, xerrors.Errorf("piece must have a PublishCID") + } + + m.inputLk.Lock() + if _, exist := m.pendingPieces[*deal.PublishCid]; exist { + m.inputLk.Unlock() + return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) + } + + resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1) + + m.pendingPieces[*deal.PublishCid] = &pendingPiece{ + size: size, + deal: deal, + data: data, + assigned: false, + accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { + resCh <- struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }{sn: sn, offset: offset, err: err} + }, + } + + go func() { + defer m.inputLk.Unlock() + if err := m.updateInput(ctx, sp); err != nil { + log.Errorf("%+v", err) + } + }() + + res := <-resCh + + return res.sn, res.offset.Padded(), res.err +} + +// called with m.inputLk +func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { + ssize, err := sp.SectorSize() + if err != nil { + return err + } + + type match struct { + sector abi.SectorID + deal cid.Cid + + size abi.UnpaddedPieceSize + padding abi.UnpaddedPieceSize + } + + var matches []match + toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors + + // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners + // (unlikely to be a problem now) + for id, sector := range m.openSectors { + avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used + + for pieceCid, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } + + toAssign[pieceCid] = struct{}{} + + if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) + matches = append(matches, match{ + sector: id, + deal: pieceCid, + + size: piece.size, + padding: avail % piece.size, + }) + } + + } + } + + sort.Slice(matches, func(i, j int) bool { + if matches[i].padding != matches[j].padding { // less padding is better + return matches[i].padding < matches[j].padding + } + + if matches[i].size != matches[j].size { // larger pieces are better + return matches[i].size < matches[j].size + } + + return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors + }) + + var assigned int + for _, mt := range matches { + if m.pendingPieces[mt.deal].assigned { + assigned++ + continue + } + + if _, found := m.openSectors[mt.sector]; !found { + continue + } + + err := m.openSectors[mt.sector].maybeAccept(mt.deal) + if err != nil { + m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece + } + + m.pendingPieces[mt.deal].assigned = true + delete(toAssign, mt.deal) + + if err != nil { + log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err) + continue + } + + delete(m.openSectors, mt.sector) + } + + if len(toAssign) > 0 { + m.tryCreateDealSector(ctx, sp) + + } + + return nil +} + +func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + if cfg.MaxSealingSectorsForDeals > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { + return nil + } + if m.stats.curStaging() > cfg.MaxWaitDealsSectors { + return nil + } + } + + // Now actually create a new sector + + sid, err := m.sc.Next() + if err != nil { + return xerrors.Errorf("getting sector number: %w", err) + } + + err = m.sealer.NewSector(ctx, m.minerSector(sp, sid)) + if err != nil { + return xerrors.Errorf("initializing sector: %w", err) + } + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + return m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }) +} diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 945565562..4e0f51202 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -15,4 +15,6 @@ type Config struct { MaxSealingSectorsForDeals uint64 WaitDealsDelay time.Duration + + TargetWaitDealsSectors uint64 } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 96d63efdc..ce54b8a9e 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -3,10 +3,7 @@ package sealing import ( "context" "errors" - "io" - "math" "sync" - "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -15,7 +12,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - padreader "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" @@ -85,9 +81,11 @@ type Sealing struct { sectors *statemachine.StateGroup sc SectorIDCounter verif ffiwrapper.Verifier + pcp PreCommitPolicy - pcp PreCommitPolicy - unsealedInfoMap UnsealedSectorMap + inputLk sync.Mutex + openSectors map[abi.SectorID]*openSector + pendingPieces map[cid.Cid]*pendingPiece upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} @@ -108,17 +106,20 @@ type FeeConfig struct { MaxTerminateGasFee abi.TokenAmount } -type UnsealedSectorMap struct { - infos map[abi.SectorNumber]UnsealedSectorInfo - lk sync.Mutex +type openSector struct { + used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors + + maybeAccept func(cid.Cid) error } -type UnsealedSectorInfo struct { - numDeals uint64 - // stored should always equal sum of pieceSizes.Padded() - stored abi.PaddedPieceSize - pieceSizes []abi.UnpaddedPieceSize - ssize abi.SectorSize +type pendingPiece struct { + size abi.UnpaddedPieceSize + deal DealInfo + + data storage.Data + + assigned bool // assigned to a sector? + accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing { @@ -132,12 +133,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds sc: sc, verif: verif, pcp: pcp, - unsealedInfoMap: UnsealedSectorMap{ - infos: make(map[abi.SectorNumber]UnsealedSectorInfo), - lk: sync.Mutex{}, - }, - toUpgrade: map[abi.SectorNumber]struct{}{}, + openSectors: map[abi.SectorID]*openSector{}, + pendingPieces: map[cid.Cid]*pendingPiece{}, + toUpgrade: map[abi.SectorNumber]struct{}{}, notifee: notifee, addrSel: as, @@ -153,8 +152,6 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) - s.unsealedInfoMap.lk.Lock() // released after initialized in .Run() - return s } @@ -178,104 +175,6 @@ func (m *Sealing) Stop(ctx context.Context) error { return nil } -func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) - if (padreader.PaddedSize(uint64(size))) != size { - return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") - } - - sp, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - ssize, err := sp.SectorSize() - if err != nil { - return 0, 0, err - } - - if size > abi.PaddedPieceSize(ssize).Unpadded() { - return 0, 0, xerrors.Errorf("piece cannot fit into a sector") - } - - m.unsealedInfoMap.lk.Lock() - - sid, pads, err := m.getSectorAndPadding(ctx, size) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("getting available sector: %w", err) - } - - for _, p := range pads { - err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("writing pads: %w", err) - } - } - - offset := m.unsealedInfoMap.infos[sid].stored - err = m.addPiece(ctx, sid, size, r, &d) - - if err != nil { - m.unsealedInfoMap.lk.Unlock() - return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) - } - - startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize) - - m.unsealedInfoMap.lk.Unlock() - - if startPacking { - if err := m.StartPacking(sid); err != nil { - return 0, 0, xerrors.Errorf("start packing: %w", err) - } - } - - return sid, offset, nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { - log.Infof("Adding piece to sector %d", sectorID) - sp, err := m.currentSealProof(ctx) - if err != nil { - return xerrors.Errorf("getting current seal proof type: %w", err) - } - ssize, err := sp.SectorSize() - if err != nil { - return err - } - - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) - if err != nil { - return xerrors.Errorf("writing piece: %w", err) - } - piece := Piece{ - Piece: ppi, - DealInfo: di, - } - - err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) - if err != nil { - return err - } - - ui := m.unsealedInfoMap.infos[sectorID] - num := m.unsealedInfoMap.infos[sectorID].numDeals - if di != nil { - num = num + 1 - } - m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{ - numDeals: num, - stored: ui.stored + piece.Piece.Size, - pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), - ssize: ssize, - } - - return nil -} - func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } @@ -292,168 +191,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -// Caller should NOT hold m.unsealedInfoMap.lk -func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { - // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else - m.unsealedInfoMap.lk.Lock() - defer m.unsealedInfoMap.lk.Unlock() - - // cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit - if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok { - log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID) - return nil - } - log.Infof("Starting packing sector %d", sectorID) - err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) - if err != nil { - return err - } - log.Infof("send Starting packing event success sector %d", sectorID) - - delete(m.unsealedInfoMap.infos, sectorID) - - return nil -} - -// Caller should hold m.unsealedInfoMap.lk -func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { - for tries := 0; tries < 100; tries++ { - for k, v := range m.unsealedInfoMap.infos { - pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) - - if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { - return k, pads, nil - } - } - - if len(m.unsealedInfoMap.infos) > 0 { - log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries) - } - - ns, ssize, err := m.newDealSector(ctx) - switch err { - case nil: - m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ - numDeals: 0, - stored: 0, - pieceSizes: nil, - ssize: ssize, - } - case errTooManySealing: - m.unsealedInfoMap.lk.Unlock() - - select { - case <-time.After(2 * time.Second): - case <-ctx.Done(): - m.unsealedInfoMap.lk.Lock() - return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err()) - } - - m.unsealedInfoMap.lk.Lock() - continue - default: - return 0, nil, xerrors.Errorf("creating new sector: %w", err) - } - - return ns, nil, nil - } - - return 0, nil, xerrors.Errorf("failed to allocate piece to a sector") -} - -var errTooManySealing = errors.New("too many sectors sealing") - -// newDealSector creates a new sector for deal storage -func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { - // First make sure we don't have too many 'open' sectors - - cfg, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting config: %w", err) - } - - if cfg.MaxSealingSectorsForDeals > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return 0, 0, ErrTooManySectorsSealing - } - } - - if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { - // Too many sectors are sealing in parallel. Start sealing one, and retry - // allocating the piece to a sector (we're dropping the lock here, so in - // case other goroutines are also trying to create a sector, we retry in - // getSectorAndPadding instead of here - otherwise if we have lots of - // parallel deals in progress, we can start creating a ton of sectors - // with just a single deal in them) - var mostStored abi.PaddedPieceSize = math.MaxUint64 - var best abi.SectorNumber = math.MaxUint64 - - for sn, info := range m.unsealedInfoMap.infos { - if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 - best = sn - } - } - - if best != math.MaxUint64 { - m.unsealedInfoMap.lk.Unlock() - err := m.StartPacking(best) - m.unsealedInfoMap.lk.Lock() - - if err != nil { - log.Errorf("newDealSector StartPacking error: %+v", err) - // let's pretend this is fine - } - } - - return 0, 0, errTooManySealing // will wait a bit and retry - } - - spt, err := m.currentSealProof(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) - } - - // Now actually create a new sector - - sid, err := m.sc.Next() - if err != nil { - return 0, 0, xerrors.Errorf("getting sector number: %w", err) - } - - err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid)) - if err != nil { - return 0, 0, xerrors.Errorf("initializing sector: %w", err) - } - - log.Infof("Creating sector %d", sid) - err = m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: spt, - }) - - if err != nil { - return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err) - } - - cf, err := m.getConfig() - if err != nil { - return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err) - } - - if cf.WaitDealsDelay > 0 { - timer := time.NewTimer(cf.WaitDealsDelay) - go func() { - <-timer.C - if err := m.StartPacking(sid); err != nil { - log.Errorf("starting sector %d: %+v", sid, err) - } - }() - } - - ssize, err := spt.SectorSize() - return sid, ssize, err -} - // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { spt, err := m.currentSealProof(ctx) diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 49a607958..da3db401b 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -6,6 +6,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ Empty: {}, WaitDeals: {}, Packing: {}, + AddPiece: {}, GetTicket: {}, PreCommit1: {}, PreCommit2: {}, @@ -43,8 +44,9 @@ const ( UndefinedSectorState SectorState = "" // happy path - Empty SectorState = "Empty" + Empty SectorState = "Empty" // deprecated WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector + AddPiece SectorState = "AddPiece" // put deal data (and padding if required) into the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain GetTicket SectorState = "GetTicket" // generate ticket PreCommit1 SectorState = "PreCommit1" // do PreCommit1 @@ -85,7 +87,9 @@ const ( func toStatState(st SectorState) statSectorState { switch st { - case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: + case Empty, WaitDeals, AddPiece: + return sstStaging + case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 78630c216..108529375 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -9,7 +9,8 @@ import ( type statSectorState int const ( - sstSealing statSectorState = iota + sstStaging statSectorState = iota + sstSealing sstFailed sstProving nsst @@ -41,5 +42,13 @@ func (ss *SectorStats) curSealing() uint64 { ss.lk.Lock() defer ss.lk.Unlock() - return ss.totals[sstSealing] + ss.totals[sstFailed] + return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed] +} + +// return the number of sectors waiting to enter the sealing pipeline +func (ss *SectorStats) curStaging() uint64 { + ss.lk.Lock() + defer ss.lk.Unlock() + + return ss.totals[sstStaging] } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 1d5073622..762fe227a 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -70,7 +70,8 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - Pieces []Piece + Pieces []Piece + PendingPieces []cid.Cid // PreCommit1 TicketValue abi.SealRandomness diff --git a/node/config/def.go b/node/config/def.go index a20e0ceaa..716e50602 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -64,6 +64,14 @@ type SealingConfig struct { MaxSealingSectorsForDeals uint64 WaitDealsDelay Duration + + // Keep this many sectors in sealing pipeline, start CC if needed + // todo TargetSealingSectors uint64 + + // Try to keep this many sectors waiting for deals + TargetWaitDealsSectors uint64 + + // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above } type MinerFeeConfig struct { @@ -183,6 +191,7 @@ func DefaultStorageMiner() *StorageMiner { MaxSealingSectors: 0, MaxSealingSectorsForDeals: 0, WaitDealsDelay: Duration(time.Hour * 6), + TargetWaitDealsSectors: 2, }, Storage: sectorstorage.SealerConfig{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 30f84aeaf..f459cf2c4 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -805,6 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), + TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, } }) return @@ -819,6 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, } }) return